Author: cmacnaug
Date: Fri Jun 19 00:49:19 2009
New Revision: 786344

URL: http://svn.apache.org/viewvc?rev=786344&view=rev
Log:
Changing isRemoveOnDispatch to return false for qeueue receivers. Otherwise 
messages are removed from the queue too soon.

Modified:
    
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java

Modified: 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786344&r1=786343&r2=786344&view=diff
==============================================================================
--- 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 (original)
+++ 
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
 Fri Jun 19 00:49:19 2009
@@ -100,12 +100,12 @@
     private Router router;
     private VirtualHost host;
     private final CommandVisitor visitor;
-    
-    ArrayList<ActiveMQDestination> temporaryDestinations = new 
ArrayList<ActiveMQDestination>(); 
+
+    ArrayList<ActiveMQDestination> temporaryDestinations = new 
ArrayList<ActiveMQDestination>();
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
-        
+
         visitor = new CommandVisitor() {
 
             // 
/////////////////////////////////////////////////////////////////
@@ -152,7 +152,7 @@
             public Response processRemoveConsumer(RemoveInfo remove, 
ConsumerId info, long arg1) throws Exception {
                 ConsumerContext ctx = consumers.remove(info);
                 if (ctx != null) {
-                       ctx.stop();
+                    ctx.stop();
                 }
                 ack(remove);
                 return null;
@@ -162,10 +162,10 @@
             // Message Processing Methods.
             // 
/////////////////////////////////////////////////////////////////
             public Response processMessage(Message info) throws Exception {
-               if( info.getOriginalDestination() == null ) {
-                       info.setOriginalDestination(info.getDestination());
-               }
-               
+                if (info.getOriginalDestination() == null) {
+                    info.setOriginalDestination(info.getDestination());
+                }
+
                 ProducerId producerId = info.getProducerId();
                 ProducerContext producerContext = producers.get(producerId);
 
@@ -214,10 +214,10 @@
                 // broker.
                 BrokerInfo brokerInfo = new BrokerInfo();
                 Broker broker = connection.getBroker();
-                               brokerInfo.setBrokerId(new 
BrokerId(broker.getName()));
+                brokerInfo.setBrokerId(new BrokerId(broker.getName()));
                 brokerInfo.setBrokerName(broker.getName());
-                if( !broker.getConnectUris().isEmpty() ) {
-                       brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
+                if (!broker.getConnectUris().isEmpty()) {
+                    brokerInfo.setBrokerURL(broker.getConnectUris().get(0));
                 }
                 connection.write(brokerInfo);
                 return ack(info);
@@ -256,13 +256,13 @@
             // Methods for server management
             // 
/////////////////////////////////////////////////////////////////
             public Response processAddDestination(DestinationInfo info) throws 
Exception {
-               ActiveMQDestination destination = info.getDestination();
-                               if( destination.isTemporary() ) {
-                                       // Keep track of it so that we can 
remove them this connection 
-                                       // shuts down.
-                       temporaryDestinations.add(destination);
-               }
-               host.createQueue(destination);
+                ActiveMQDestination destination = info.getDestination();
+                if (destination.isTemporary()) {
+                    // Keep track of it so that we can remove them this 
connection 
+                    // shuts down.
+                    temporaryDestinations.add(destination);
+                }
+                host.createQueue(destination);
                 return ack(info);
             }
 
@@ -348,6 +348,7 @@
         Command command = (Command) o;
         boolean responseRequired = command.isResponseRequired();
         try {
+            //System.out.println(o);
             command.visit(visitor);
         } catch (Exception e) {
             if (responseRequired) {
@@ -449,7 +450,7 @@
 
         private HashMap<MessageId, SubscriptionDeliveryCallback> 
pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
         private LinkedList<MessageId> pendingMessageIds = new 
LinkedList<MessageId>();
-               private BrokerSubscription brokerSubscription;
+        private BrokerSubscription brokerSubscription;
 
         public ConsumerContext(final ConsumerInfo info) throws Exception {
             this.info = info;
@@ -464,22 +465,22 @@
                 }
             };
 
+            isQueueReceiver = info.getDestination().isQueue();
+
             controller = new FlowController<MessageDelivery>(null, flow, 
limiter, this);
             controller.useOverFlowQueue(false);
             
controller.setExecutor(connection.getDispatcher().createPriorityExecutor(connection.getDispatcher().getDispatchPriorities()
 - 1));
             super.onFlowOpened(controller);
         }
 
-        
-               public void start() throws Exception {
+        public void start() throws Exception {
             brokerSubscription = host.createSubscription(this);
             brokerSubscription.connect(this);
-               }
-
-               public void stop() throws Exception {
-                       brokerSubscription.disconnect(this);
-               }
+        }
 
+        public void stop() throws Exception {
+            brokerSubscription.disconnect(this);
+        }
 
         public boolean offer(final MessageDelivery message, 
ISourceController<?> source, SubscriptionDeliveryCallback callback) {
             if (!controller.offer(message, source)) {
@@ -587,7 +588,10 @@
          * .Object)
          */
         public boolean isRemoveOnDispatch(MessageDelivery elem) {
-            return !elem.isPersistent() || !(isDurable || isQueueReceiver);
+            if (isQueueReceiver()) {
+                return false;
+            }
+            return !elem.isPersistent() || !isDurable;
         }
 
         /*
@@ -696,9 +700,9 @@
             return offer(message, source, null);
         }
 
-               public boolean autoCreateDestination() {
-                       return true;
-               }
+        public boolean autoCreateDestination() {
+            return true;
+        }
 
     }
 


Reply via email to