Author: jstrachan
Date: Thu Aug 10 10:17:19 2006
New Revision: 430445
URL: http://svn.apache.org/viewvc?rev=430445&view=rev
Log:
added some early support for AMQ-855 to allow pure pull based consumption -
adding a MessagePull command so that a client can pull messages on demand
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Thu Aug 10 10:17:19 2006
@@ -44,6 +44,7 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -384,6 +385,10 @@
return null;
}
+ public Response processMessagePull(MessagePull pull) throws Exception {
+ return
broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),
pull);
+ }
+
public Response
processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception{
broker.processDispatchNotification(notification);
return null;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Thu Aug 10 10:17:19 2006
@@ -28,7 +28,9 @@
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Thu Aug 10 10:17:19 2006
@@ -29,8 +29,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@@ -68,6 +70,10 @@
public void acknowledge(ConnectionContext context, MessageAck ack) throws
Exception {
next.acknowledge(context, ack);
+ }
+
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
+ return next.messagePull(context, pull);
}
public void addConnection(ConnectionContext context, ConnectionInfo info)
throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Thu Aug 10 10:17:19 2006
@@ -29,8 +29,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@@ -221,6 +223,11 @@
}
public void setAdminConnectionContext(ConnectionContext
adminConnectionContext) {
+ }
+
+
+ public Response messagePull(ConnectionContext context, MessagePull pull) {
+ return null;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Thu Aug 10 10:17:19 2006
@@ -33,8 +33,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@@ -221,6 +223,10 @@
}
public void setAdminConnectionContext(ConnectionContext
adminConnectionContext) {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Thu Aug 10 10:17:19 2006
@@ -29,8 +29,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
@@ -236,6 +238,10 @@
public void setAdminConnectionContext(ConnectionContext
adminConnectionContext) {
getNext().setAdminConnectionContext(adminConnectionContext);
+ }
+
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
+ return getNext().messagePull(context, pull);
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Thu Aug 10 10:17:19 2006
@@ -30,7 +30,9 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
@@ -254,12 +256,17 @@
}
public void acknowledge(ConnectionContext context, MessageAck ack) throws
Exception {
-
Subscription sub = (Subscription)
subscriptions.get(ack.getConsumerId());
if( sub==null )
throw new IllegalArgumentException("The subscription does not
exist: "+ack.getConsumerId());
sub.acknowledge(context, ack);
-
+ }
+
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
+ Subscription sub = (Subscription)
subscriptions.get(pull.getConsumerId());
+ if( sub==null )
+ throw new IllegalArgumentException("The subscription does not
exist: "+pull.getConsumerId());
+ return sub.pullMessage(context, pull);
}
protected Destination lookup(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Aug 10 10:17:19 2006
@@ -35,6 +35,8 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
@@ -62,6 +64,20 @@
super(broker,context,info);
}
+
+ /**
+ * Allows a message to be pulled on demand by a client
+ */
+ public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
+ if (getPrefetchSize() == 0) {
+ prefetchExtension++;
+ dispatchMatched();
+
+ // TODO it might be nice one day to actually return the message
itself
+ }
+ return null;
+ }
+
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
if(!isFull()){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
Thu Aug 10 10:17:19 2006
@@ -24,7 +24,9 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import java.util.Map;
import java.util.Set;
@@ -108,6 +110,11 @@
*/
public void acknowledge(ConnectionContext context, MessageAck ack) throws
Exception;
+ /**
+ * Allows a consumer to pull a message from a queue
+ */
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception;
+
/**
* Process a notification of a dispatch - used by a Slave Broker
* @param messageDispatchNotification
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Thu Aug 10 10:17:19 2006
@@ -36,8 +36,10 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.memory.UsageManager;
@@ -395,6 +397,26 @@
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.acknowledge(context, ack);
break;
+ default:
+ throw createUnknownDestinationTypeException(destination);
+ }
+ }
+
+
+ public Response messagePull(ConnectionContext context, MessagePull pull)
throws Exception {
+ ActiveMQDestination destination = pull.getDestination();
+ switch (destination.getDestinationType()) {
+ case ActiveMQDestination.QUEUE_TYPE:
+ return queueRegion.messagePull(context, pull);
+
+ case ActiveMQDestination.TOPIC_TYPE:
+ return topicRegion.messagePull(context, pull);
+
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ return tempQueueRegion.messagePull(context, pull);
+
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ return tempTopicRegion.messagePull(context, pull);
default:
throw createUnknownDestinationTypeException(destination);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Thu Aug 10 10:17:19 2006
@@ -24,6 +24,8 @@
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import javax.jms.InvalidSelectorException;
@@ -50,6 +52,12 @@
*/
void acknowledge(ConnectionContext context, final MessageAck ack) throws
Exception;
+
+ /**
+ * Allows a consumer to pull a message on demand
+ */
+ Response pullMessage(ConnectionContext context, MessagePull pull) throws
Exception;
+
/**
* Is the subscription interested in the message?
* @param node
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Aug 10 10:17:19 2006
@@ -36,6 +36,8 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
@@ -179,6 +181,11 @@
return;
}
throw new JMSException("Invalid acknowledgment: "+ack);
+ }
+
+ public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
+ // not supported for topics
+ return null;
}
public int getPendingQueueSize(){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/CommandTypes.java
Thu Aug 10 10:17:19 2006
@@ -57,6 +57,7 @@
// and the server.
//
///////////////////////////////////////////////////
+ byte MESSAGE_PULL = 20;
byte MESSAGE_DISPATCH = 21;
byte MESSAGE_ACK = 22;
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java?rev=430445&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
Thu Aug 10 10:17:19 2006
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Used to pull messages on demand.
+ *
+ * @openwire:marshaller code="20"
+ *
+ * @version $Revision$
+ */
+public class MessagePull extends BaseCommand {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
+
+ protected ConsumerId consumerId;
+ protected ActiveMQDestination destination;
+ protected long timeout;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessagePull(this);
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ConsumerId getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(ConsumerId consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ /**
+ * @openwire:property version=1 cache=true
+ */
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public void setDestination(ActiveMQDestination destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @openwire:property version=1
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/CommandVisitor.java
Thu Aug 10 10:17:19 2006
@@ -28,6 +28,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -56,6 +57,7 @@
Response processMessage(Message send) throws Exception;
Response processMessageAck(MessageAck ack) throws Exception;
+ Response processMessagePull(MessagePull pull) throws Exception;
Response processBeginTransaction(TransactionInfo info) throws Exception;
Response processPrepareTransaction(TransactionInfo info) throws Exception;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=430445&r1=430444&r2=430445&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Thu Aug 10 10:17:19 2006
@@ -32,6 +32,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
@@ -278,6 +279,10 @@
public Response
processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception{
return null;
}
+
+ public Response processMessagePull(MessagePull pull) throws Exception {
+ return null;
+ }
public boolean isRestoreConsumers() {
return restoreConsumers;
@@ -302,4 +307,5 @@
public void setRestoreSessions(boolean restoreSessions) {
this.restoreSessions = restoreSessions;
}
+
}