Author: cmacnaug
Date: Mon Jun 22 18:28:39 2009
New Revision: 787345
URL: http://svn.apache.org/viewvc?rev=787345&view=rev
Log: (empty)
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -17,10 +17,13 @@
package org.apache.activemq.apollo.broker;
import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
import org.apache.activemq.Service;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.flow.AbstractLimitedFlowResource;
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.queue.Subscription;
import org.apache.activemq.wireformat.WireFormat;
@@ -39,26 +42,100 @@
public BrokerMessageDelivery createMessageDelivery(MessageRecord record)
throws IOException;
- public interface ConsumerContext extends Subscription<MessageDelivery>,
IFlowSink<MessageDelivery> {
-
+ /**
+ * ClientContext
+ * <p>
+ * Description: Base interface describing a channel on a physical
+ * connection.
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+ public interface ClientContext {
+ public ClientContext getParent();
+
+ public Collection<ClientContext> getChildren();
+
+ public void addChild(ClientContext context);
+
+ public void removeChild(ClientContext context);
+
+ public void close();
+
+ }
+
+ public abstract class AbstractClientContext<E extends MessageDelivery>
extends AbstractLimitedFlowResource<E> implements ClientContext {
+ protected final HashSet<ClientContext> children = new
HashSet<ClientContext>();
+ protected final ClientContext parent;
+ protected boolean closed = false;
+
+ public AbstractClientContext(String name, ClientContext parent) {
+ super(name);
+ this.parent = parent;
+ if (parent != null) {
+ parent.addChild(this);
+ }
+ }
+
+ public ClientContext getParent() {
+ return parent;
+ }
+
+ public void addChild(ClientContext child) {
+ if (!closed) {
+ children.add(child);
+ }
+ }
+
+ public void removeChild(ClientContext child) {
+ if (!closed) {
+ children.remove(child);
+ }
+ }
+
+ public Collection<ClientContext> getChildren() {
+ return children;
+ }
+
+ public void close() {
+
+ closed = true;
+
+ for (ClientContext c : children) {
+ c.close();
+ }
+
+ if (parent != null) {
+ parent.removeChild(this);
+ }
+
+ super.close();
+ }
+ }
+
+ public interface ConsumerContext extends ClientContext,
Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
+
public String getConsumerId();
-
+
public Destination getDestination();
public String getSelector();
-
+
public BooleanExpression getSelectorExpression();
-
+
public boolean isDurable();
-
+
public String getSubscriptionName();
-
+
/**
- * If the destination does not exist, should it automatically be
created?
+ * If the destination does not exist, should it automatically be
+ * created?
+ *
* @return
*/
public boolean autoCreateDestination();
-
+
}
}
\ No newline at end of file
Modified:
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimitedFlowResource.java
Mon Jun 22 18:28:39 2009
@@ -120,4 +120,9 @@
public synchronized IFlowController<E> getFlowController(Flow flow) {
return openControllers.get(flow);
}
+
+ public synchronized void close()
+ {
+
+ }
}
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -91,6 +91,8 @@
public class OpenwireProtocolHandler implements ProtocolHandler,
PersistListener {
+ protected final HashMap<ConnectionId, ClientContext> connections = new
HashMap<ConnectionId, ClientContext>();
+ protected final HashMap<SessionId, ClientContext> sessions = new
HashMap<SessionId, ClientContext>();
protected final HashMap<ProducerId, ProducerContext> producers = new
HashMap<ProducerId, ProducerContext>();
protected final HashMap<ConsumerId, ConsumerContext> consumers = new
HashMap<ConsumerId, ConsumerContext>();
@@ -111,48 +113,101 @@
//
/////////////////////////////////////////////////////////////////
// Methods that keep track of the client state
//
/////////////////////////////////////////////////////////////////
- public Response processAddConnection(ConnectionInfo info) throws
Exception {
- connection.setName(info.getClientId());
+ public Response processAddConnection(final ConnectionInfo info)
throws Exception {
+ if (!connections.containsKey(info.getConnectionId())) {
+
+ ClientContext connection = new
AbstractClientContext<MessageDelivery>(info.getConnectionId().toString(), null)
{
+ ConnectionInfo connectionInfo = info;
+
+ public void close() {
+ super.close();
+
connections.remove(connectionInfo.getConnectionId());
+ }
+ };
+ connections.put(info.getConnectionId(), connection);
+ }
return ack(info);
}
- public Response processAddSession(SessionInfo info) throws
Exception {
+ public Response processAddSession(final SessionInfo info) throws
Exception {
+ ClientContext connection =
connections.get(info.getSessionId().getParentId());
+ if (connection == null) {
+ throw new IllegalStateException(host.getHostName() + "
Cannot add a session to a connection that had not been registered: " +
info.getSessionId().getParentId());
+ }
+
+ if (!sessions.containsKey(info.getSessionId())) {
+
+ ClientContext session = new
AbstractClientContext<MessageDelivery>(info.getSessionId().toString(),
connection) {
+ SessionInfo sessioninfo = info;
+
+ public void close() {
+ super.close();
+ sessions.remove(sessioninfo.getSessionId());
+ }
+ };
+
+ sessions.put(info.getSessionId(), session);
+ }
+
return ack(info);
}
public Response processAddProducer(ProducerInfo info) throws
Exception {
- producers.put(info.getProducerId(), new ProducerContext(info));
+ ClientContext session =
sessions.get(info.getProducerId().getParentId());
+ if (session == null) {
+ throw new IllegalStateException(host.getHostName() + "
Cannot add a producer to a session that had not been registered: " +
info.getProducerId().getParentId());
+ }
+ if (!producers.containsKey(info.getProducerId())) {
+ ProducerContext producer = new ProducerContext(info,
session);
+ }
return ack(info);
}
public Response processAddConsumer(ConsumerInfo info) throws
Exception {
- ConsumerContext ctx = new ConsumerContext(info);
- consumers.put(info.getConsumerId(), ctx);
- ctx.start();
+ ClientContext session =
sessions.get(info.getConsumerId().getParentId());
+ if (session == null) {
+ throw new IllegalStateException(host.getHostName() + "
Cannot add a consumer to a session that had not been registered: " +
info.getConsumerId().getParentId());
+ }
+
+ if (!consumers.containsKey(info.getConsumerId())) {
+ ConsumerContext ctx = new ConsumerContext(info, session);
+ ctx.start();
+ }
+
return ack(info);
}
public Response processRemoveConnection(RemoveInfo remove,
ConnectionId info, long arg1) throws Exception {
+ ClientContext cc = connections.get(info);
+ if (cc != null) {
+ cc.close();
+ }
ack(remove);
return null;
}
public Response processRemoveSession(RemoveInfo remove, SessionId
info, long arg1) throws Exception {
+ ClientContext cc = sessions.get(info);
+ if (cc != null) {
+ cc.close();
+ }
ack(remove);
return null;
}
public Response processRemoveProducer(RemoveInfo remove,
ProducerId info) throws Exception {
- producers.remove(info);
- //TODO add close logic?
+ ClientContext cc = producers.get(info);
+ if (cc != null) {
+ cc.close();
+ }
ack(remove);
return null;
}
public Response processRemoveConsumer(RemoveInfo remove,
ConsumerId info, long arg1) throws Exception {
- ConsumerContext ctx = consumers.remove(info);
- if (ctx != null) {
- ctx.stop();
+ ClientContext cc = consumers.get(info);
+ if (cc != null) {
+ cc.close();
}
ack(remove);
return null;
@@ -344,8 +399,8 @@
}
public void onCommand(Object o) {
- boolean responseRequired=false;
- int commandId=0;
+ boolean responseRequired = false;
+ int commandId = 0;
try {
Command command = (Command) o;
commandId = command.getCommandId();
@@ -360,8 +415,7 @@
} else {
connection.onException(e);
}
- }
- catch (Throwable t) {
+ } catch (Throwable t) {
if (responseRequired) {
ExceptionResponse response = new ExceptionResponse(t);
response.setCorrelationId(commandId);
@@ -406,15 +460,17 @@
// Internal Support Methods
// /////////////////////////////////////////////////////////////////
- class ProducerContext extends
AbstractLimitedFlowResource<OpenWireMessageDelivery> {
+ class ProducerContext extends
AbstractClientContext<OpenWireMessageDelivery> {
protected final Object inboundMutex = new Object();
private IFlowController<OpenWireMessageDelivery> controller;
- private String name;
+ private final ProducerInfo info;
- public ProducerContext(final ProducerInfo info) {
- super(info.getProducerId().toString());
- final Flow flow = new Flow("broker-" + name + "-inbound", false);
+ public ProducerContext(final ProducerInfo info, ClientContext parent) {
+ super(info.getProducerId().toString(), parent);
+ this.info = info;
+ producers.put(info.getProducerId(), this);
+ final Flow flow = new Flow("broker-" + super.getResourceName() +
"-inbound", false);
// Openwire only uses credit windows at the producer level for
// producers that request the feature.
@@ -445,9 +501,14 @@
super.onFlowOpened(controller);
}
+
+ public void close() {
+ super.close();
+ producers.remove(info);
+ }
}
- class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery>
implements ProtocolHandler.ConsumerContext, Service {
+ class ConsumerContext extends AbstractClientContext<MessageDelivery>
implements ProtocolHandler.ConsumerContext {
private final ConsumerInfo info;
private String name;
@@ -462,9 +523,11 @@
private LinkedList<MessageId> pendingMessageIds = new
LinkedList<MessageId>();
private BrokerSubscription brokerSubscription;
- public ConsumerContext(final ConsumerInfo info) throws Exception {
+ public ConsumerContext(final ConsumerInfo info, ClientContext parent)
throws Exception {
+ super(info.getConsumerId().toString(), parent);
this.info = info;
this.name = info.getConsumerId().toString();
+ consumers.put(info.getConsumerId(), this);
Flow flow = new Flow("broker-" + name + "-outbound", false);
selector = parseSelector(info);
@@ -476,7 +539,9 @@
};
isQueueReceiver = info.getDestination().isQueue();
-
+ if (info.getSubscriptionName() != null) {
+ isDurable = true;
+ }
controller = new FlowController<MessageDelivery>(null, flow,
limiter, this);
controller.useOverFlowQueue(false);
controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities()
- 1));
@@ -488,10 +553,6 @@
brokerSubscription.connect(this);
}
- public void stop() throws Exception {
- brokerSubscription.disconnect(this);
- }
-
public boolean offer(final MessageDelivery message,
ISourceController<?> source, SubscriptionDeliveryCallback callback) {
if (!controller.offer(message, source)) {
return false;
@@ -514,6 +575,9 @@
md.setDestination(msg.getDestination());
// Add to the pending list if persistent and we are durable:
if (callback != null) {
+ if (callback.isRedelivery()) {
+ md.setRedeliveryCounter(1);
+ }
synchronized (this) {
Object old = pendingMessages.put(msg.getMessageId(),
callback);
if (old != null) {
@@ -530,26 +594,29 @@
public void ack(MessageAck info) {
// TODO: The pending message queue could probably be optimized to
// avoid having to create a new list here.
- LinkedList<SubscriptionDeliveryCallback> acked = new
LinkedList<SubscriptionDeliveryCallback>();
- synchronized (this) {
- MessageId id = info.getLastMessageId();
- if (isDurable() || isQueueReceiver())
- while (!pendingMessageIds.isEmpty()) {
- MessageId pendingId = pendingMessageIds.getFirst();
- SubscriptionDeliveryCallback callback =
pendingMessages.remove(pendingId);
- acked.add(callback);
- pendingMessageIds.removeFirst();
- if (pendingId.equals(id)) {
- break;
+ //if(info.isStandardAck())
+ {
+ LinkedList<SubscriptionDeliveryCallback> acked = new
LinkedList<SubscriptionDeliveryCallback>();
+ synchronized (this) {
+ MessageId id = info.getLastMessageId();
+ if (isDurable() || isQueueReceiver())
+ while (!pendingMessageIds.isEmpty()) {
+ MessageId pendingId = pendingMessageIds.getFirst();
+ SubscriptionDeliveryCallback callback =
pendingMessages.remove(pendingId);
+ acked.add(callback);
+ pendingMessageIds.removeFirst();
+ if (pendingId.equals(id)) {
+ break;
+ }
}
- }
- limiter.onProtocolCredit(info.getMessageCount());
- }
+ limiter.onProtocolCredit(info.getMessageCount());
+ }
- // Delete outside of synchronization on queue to avoid contention
- // with enqueueing threads.
- for (SubscriptionDeliveryCallback callback : acked) {
- callback.acknowledge();
+ // Delete outside of synchronization on queue to avoid
contention
+ // with enqueueing threads.
+ for (SubscriptionDeliveryCallback callback : acked) {
+ callback.acknowledge();
+ }
}
}
@@ -584,7 +651,7 @@
public boolean isExclusive() {
return info.isExclusive();
}
-
+
/*
* (non-Javadoc)
*
@@ -722,6 +789,36 @@
return info.getConsumerId().toString();
}
+ public void close() {
+ brokerSubscription.disconnect(this);
+
+ if (isDurable() || isQueueReceiver()) {
+ LinkedList<SubscriptionDeliveryCallback> unacquired = null;
+
+ synchronized (this) {
+
+ unacquired = new
LinkedList<SubscriptionDeliveryCallback>();
+ while (!pendingMessageIds.isEmpty()) {
+ MessageId pendingId = pendingMessageIds.getLast();
+ SubscriptionDeliveryCallback callback =
pendingMessages.remove(pendingId);
+ unacquired.add(callback);
+ pendingMessageIds.removeLast();
+ }
+ limiter.onProtocolCredit(unacquired.size());
+ }
+
+ if (unacquired != null) {
+ // Delete outside of synchronization on queue to avoid
contention
+ // with enqueueing threads.
+ for (SubscriptionDeliveryCallback callback : unacquired) {
+ callback.unacquire(controller);
+ }
+ }
+ }
+
+ super.close();
+ consumers.remove(info.getConsumerId());
+ }
}
private static BooleanExpression parseSelector(ConsumerInfo info) throws
FilterException {
Modified:
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
Mon Jun 22 18:28:39 2009
@@ -1318,11 +1318,11 @@
assertNotNull(m);
assertEquals(m.getMessageId(), message1.getMessageId());
- assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 2);
+ assertEquals(countMessagesInQueue(connection, connectionInfo,
destination), 2);
connection.send(createAck(consumerInfo, m, 1,
MessageAck.DELIVERED_ACK_TYPE));
- assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 2);
+ assertEquals(countMessagesInQueue(connection, connectionInfo,
destination), 2);
connection.send(createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE));
- assertTrue(countMessagesInQueue(connection, connectionInfo,
destination) == 1);
+ assertEquals(countMessagesInQueue(connection, connectionInfo,
destination), 1);
}
Modified:
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
Mon Jun 22 18:28:39 2009
@@ -96,7 +96,11 @@
protected abstract void requestDispatch();
- protected abstract void acknowledge(QueueElement<V> elem);
+ protected abstract Object getMutex();
+
+ protected abstract void onElementRemoved(QueueElement<V> qe);
+
+ protected abstract void onElementReenqueued(QueueElement<V> qe,
ISourceController<?> controller);
/**
* Adds an element to the queue.
@@ -250,6 +254,11 @@
// sequence number beyond the queue's limit.
long sequence = -1;
+ //When an element on the queue is reenqueued, this
+ //is set to indicate that the cursor should go back
+ //and consider the element:
+ long reenqueueSequence = -1;
+
// The cursor is holding references for all
// elements between first and last inclusive:
QueueElement<V> firstRef = null;
@@ -543,6 +552,12 @@
public final QueueElement<V> getNext() {
try {
+
+ if (reenqueueSequence != -1) {
+ reset(reenqueueSequence);
+ reenqueueSequence = -1;
+ }
+
if (atEnd()) {
updateCurrent(null);
return null;
@@ -640,6 +655,16 @@
return true;
}
+ public void onElementUnacquired(QueueElement<V> qe) {
+ if (qe.sequence < sequence) {
+ if (reenqueueSequence >= 0) {
+ reenqueueSequence = Math.min(reenqueueSequence,
qe.sequence);
+ } else {
+ reenqueueSequence = qe.sequence;
+ }
+ }
+ }
+
/**
*
*/
@@ -667,6 +692,10 @@
return true;
}
+ if (reenqueueSequence != -1) {
+ return false;
+ }
+
if (sequence > limit) {
return true;
}
@@ -806,7 +835,26 @@
}
public final void acknowledge() {
- queue.acknowledge(this);
+ synchronized (queue.getMutex()) {
+ delete();
+ }
+ }
+
+ public void unacquire(ISourceController<?> source) {
+ synchronized (queue.getMutex()) {
+ if (owner != null) {
+ owner = null;
+ if (!deleted) {
+ redelivered = true;
+ //TODO need to account for this memory space, and
check
+ //load/unload:
+ for (Cursor<V> c : queue.openCursors) {
+ c.onElementUnacquired(this);
+ }
+ }
+ }
+ queue.requestDispatch();
+ }
}
public final boolean delete() {
@@ -819,24 +867,16 @@
if (saved) {
queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
}
+
elem = null;
unload(null);
+
+ queue.onElementRemoved(this);
return true;
}
return false;
}
- public final void unacquire(ISourceController<?> source) {
- owner = null;
- if (isExpired()) {
- acknowledge();
- } else {
- // TODO reset all cursors beyond this sequence number
- // back to this element
- throw new UnsupportedOperationException("Not yet implemented");
- }
- }
-
/**
* Attempts to unlink this element from the queue
*/
@@ -916,6 +956,10 @@
}
+ public final boolean isRedelivery() {
+ return redelivered;
+ }
+
private boolean unlinkable() {
return softRefs == 0 && !loaded;
}
@@ -954,6 +998,8 @@
}
this.size = re.getElementSize();
this.expiration = re.getExpiration();
+ //TODO Need to add redelivery to the store:
+ //this.redelivered = re.getRedelivered();
// If the loader asked to add a soft ref do it:
if (softRef) {
addSoftRef();
@@ -1005,7 +1051,7 @@
}
public final boolean isPagedOut() {
- return elem == null;
+ return elem == null && !deleted;
}
public final boolean isLoaded() {
Modified:
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
Mon Jun 22 18:28:39 2009
@@ -117,13 +117,22 @@
queue = new CursoredQueue<E>(persistencePolicy, expirationMapper,
controller.getFlow(), queueDescriptor, queueStore, this) {
@Override
- protected void acknowledge(QueueElement<E> qe) {
+ protected Object getMutex() {
+ return ExclusivePersistentQueue.this;
+ }
+
+ @Override
+ protected void onElementRemoved(QueueElement<E> qe) {
+ synchronized (ExclusivePersistentQueue.this) {
+ limiter.remove(1, qe.getLimiterSize());
+ }
+ }
+
+ @Override
+ protected void onElementReenqueued(QueueElement<E> qe,
ISourceController<?> source) {
synchronized (ExclusivePersistentQueue.this) {
- E elem = qe.getElement();
- if (qe.delete()) {
- if (!qe.isAcquired()) {
- controller.elementDispatched(elem);
- }
+ if (isDispatchReady()) {
+ notifyReady();
}
}
}
@@ -185,7 +194,8 @@
public synchronized boolean removeSubscription(Subscription<E> sub) {
if (sub == subscription) {
- sub = null;
+ subscription = null;
+ cursor.reset(queue.getFirstSequence());
return true;
} else {
return false;
@@ -280,7 +290,6 @@
// See if the sink has room:
qe.setAcquired(subscription);
if (subscription.offer(qe.elem, sourceController, callback)) {
- controller.elementDispatched(qe.getElement());
// If remove on dispatch acknowledge now:
if (callback == null) {
qe.acknowledge();
Modified:
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
Mon Jun 22 18:28:39 2009
@@ -142,9 +142,16 @@
queue = new CursoredQueue<V>(persistencePolicy,
expirationMapper, flow, queueDescriptor, store, mutex) {
@Override
- protected void acknowledge(QueueElement<V> elem) {
- SharedQueue.this.acknowledge(elem);
+ protected void onElementRemoved(QueueElement<V> elem) {
+ synchronized (mutex) {
+ //If the element wasn't acqired release space:
+ sizeLimiter.remove(1, elem.getLimiterSize());
+ }
+ }
+ @Override
+ protected Object getMutex() {
+ return mutex;
}
@Override
@@ -156,6 +163,15 @@
protected void requestDispatch() {
notifyReady();
}
+
+ @Override
+ protected void onElementReenqueued(QueueElement<V> qe,
ISourceController<?> controller) {
+ synchronized (SharedQueue.this) {
+ if (isDispatchReady()) {
+ notifyReady();
+ }
+ }
+ }
};
queue.initialize(sequenceMin, sequenceMax, count, size);
@@ -287,16 +303,6 @@
return inputController;
}
- final void acknowledge(QueueElement<V> qe) {
- synchronized (mutex) {
- qe.delete();
- //If the element wasn't acqired release space:
- if (!qe.isAcquired()) {
- sizeLimiter.remove(1, qe.getLimiterSize());
- }
- }
- }
-
/**
* Starts this queue.
*/
@@ -430,9 +436,6 @@
SubscriptionContext nextConsumer = consumer.getNext();
switch (consumer.offer(next)) {
case ACCEPTED:
- if (DEBUG)
- System.out.println("Dispatched " +
next.getElement() + " to " + consumer);
-
// Rotate list so this one is last next time:
sharedConsumers.rotate();
interested = true;
@@ -539,6 +542,10 @@
} else {
cursor.reset(queue.getFirstSequence());
}
+
+ if (DEBUG)
+ System.out.println("Starting " + this + " at " + cursor);
+
updateDispatchList();
}
@@ -686,7 +693,7 @@
// Check for expiration:
if (qe.isExpired()) {
- acknowledge(qe);
+ qe.acknowledge();
return ACCEPTED;
}
@@ -696,9 +703,11 @@
// See if the sink has room:
qe.setAcquired(sub);
if (sub.offer(qe.elem, this, callback)) {
- if (!sub.isBrowser()) {
+ if (DEBUG)
+ System.out.println("Dispatched " + qe.getElement() + " to
" + this);
- sizeLimiter.remove(1, qe.getLimiterSize());
+
+ if (!sub.isBrowser()) {
// If remove on dispatch acknowledge now:
if (callback == null) {
Modified:
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
Mon Jun 22 18:28:39 2009
@@ -22,28 +22,33 @@
public interface Subscription<E> {
public interface SubscriptionDeliveryCallback {
-
+
/**
* If {...@link Subscription#isBrowser()} returns false this method
- * indicates that the Subscription is finished with the element
- * and that it can be removed from the queue.
+ * indicates that the Subscription is finished with the element and
that
+ * it can be removed from the queue.
*/
public void acknowledge();
/**
- * Indicates that the subscription no longer has interest in
- * the element and that it should be placed back on the queue.
+ * Indicates that the subscription no longer has interest in the
element
+ * and that it should be placed back on the queue.
*
- * The provided source controller will be blocked if there
- * is not enough space available on the queue to
- * reenqueue the element.
+ * The provided source controller will be blocked if there is not
enough
+ * space available on the queue to reenqueue the element.
*
- * It is illegal to call this method after a prior call to
- * {...@link #acknowledge()}.
+ * It is illegal to call this method after a prior call to
+ * {...@link #acknowledge()}.
*
- * @param source The source controller.
+ * @param source
+ * The source controller.
*/
public void unacquire(ISourceController<?> sourceController);
+
+ /**
+ * @return Returns true if the delivery is a redelivery
+ */
+ public boolean isRedelivery();
}
/**
@@ -52,25 +57,25 @@
* @return true if the element should be removed on dispatch
*/
public boolean isRemoveOnDispatch(E elem);
-
+
/**
- * @return True if this is a subscription browser.
+ * @return True if this is a subscription browser.
*/
public boolean isBrowser();
-
+
/**
- * Indicates that the subscription is exclusive. When there at least one
- * exclusive subscription on a shared queue, the queue will dispatch to
- * only one such consumer while there is at least one connected.
+ * Indicates that the subscription is exclusive. When there at least one
+ * exclusive subscription on a shared queue, the queue will dispatch to
only
+ * one such consumer while there is at least one connected.
*
* @return True if the Subscription is exclusive.
*/
public boolean isExclusive();
/**
- * Returns true if the Subscription has a selector. If true
- * is returned the {...@link #matches(Object)} will be called
- * prior to an attempt to offer the message to {...@link Subscription}
+ * Returns true if the Subscription has a selector. If true is returned the
+ * {...@link #matches(Object)} will be called prior to an attempt to offer
the
+ * message to {...@link Subscription}
*
* @return true if this {...@link Subscription} has a selector.
*/
@@ -96,7 +101,8 @@
* The queue's controller, which must be used if the offered
* element exceeds the subscription's buffer limits.
* @param callback
- * The {...@link SubscriptionDeliveryCallback} associated with
the element
+ * The {...@link SubscriptionDeliveryCallback} associated with
the
+ * element
*
* @return true if the element was accepted false otherwise, if false is
* returned the caller must have called
@@ -104,7 +110,7 @@
* returning false.
*/
public boolean offer(E element, ISourceController<?> controller,
SubscriptionDeliveryCallback callback);
-
+
/**
* Pushes an item to the subscription. If the subscription is not remove on
* dispatch, then it must call acknowledge method on the callback when it
@@ -116,7 +122,8 @@
* The queue's controller, which must be used if the added
* element exceeds the subscription's buffer limits.
* @param callback
- * The {...@link SubscriptionDeliveryCallback} associated with
the element
+ * The {...@link SubscriptionDeliveryCallback} associated with
the
+ * element
* @return true if the element was accepted false otherwise, if false is
* returned the caller must have called
* {...@link ISourceController#onFlowBlock(ISinkController)} prior
to
Modified:
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=787345&r1=787344&r2=787345&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
(original)
+++
activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
Mon Jun 22 18:28:39 2009
@@ -33,6 +33,7 @@
import org.apache.activemq.apollo.broker.MessageDelivery;
import org.apache.activemq.apollo.broker.ProtocolHandler;
import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.broker.ProtocolHandler.AbstractClientContext;
import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.BooleanExpression;
@@ -259,7 +260,7 @@
}
}
- class ConsumerContext extends AbstractLimitedFlowResource<MessageDelivery>
implements ProtocolHandler.ConsumerContext {
+ class ConsumerContext extends AbstractClientContext<MessageDelivery>
implements ProtocolHandler.ConsumerContext {
private BooleanExpression selector;
private String selectorString;
@@ -277,6 +278,7 @@
private boolean durable;
public ConsumerContext(final StompFrame subscribe) throws Exception {
+ super(subscribe.getHeaders().get(Stomp.Headers.Subscribe.ID),
null);
translator = translator(subscribe);
Map<String, String> headers = subscribe.getHeaders();