Author: kwall
Date: Tue Jul 22 14:30:57 2014
New Revision: 1612578

URL: http://svn.apache.org/r1612578
Log:
QPID-5912: [Java Broker] Prevent failure to send to a consumer on the straight 
through delivery path from preventing the message being enqueued to the store.

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Tue Jul 22 14:30:57 2014
@@ -82,11 +82,13 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
         extends AbstractConfiguredObject<X>
@@ -931,90 +933,115 @@ public abstract class AbstractQueue<X ex
         final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
         final QueueEntry entry = getEntries().add(message);
 
-        if(action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
+        try
         {
-            /*
-            * iterate over consumers and if any is at the end of the queue and 
can deliver this message,
-             * then deliver the message
-             */
-
-            Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate 
Delivery"),
-                         new PrivilegedAction<Object>()
-                         {
-                             @Override
-                             public Object run()
+            if (action != null || (exclusiveSub == null  && 
_queueRunner.isIdle()))
+            {
+                Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate 
Delivery"),
+                             new PrivilegedAction<Void>()
                              {
-
-                                 QueueConsumerList.ConsumerNode node = 
_consumerList.getMarkedNode();
-                                 QueueConsumerList.ConsumerNode nextNode = 
node.findNext();
-                                 if (nextNode == null)
-                                 {
-                                     nextNode = 
_consumerList.getHead().findNext();
-                                 }
-                                 while (nextNode != null)
+                                 @Override
+                                 public Void run()
                                  {
-                                     if (_consumerList.updateMarkedNode(node, 
nextNode))
-                                     {
-                                         break;
-                                     }
-                                     else
-                                     {
-                                         node = _consumerList.getMarkedNode();
-                                         nextNode = node.findNext();
-                                         if (nextNode == null)
-                                         {
-                                             nextNode = 
_consumerList.getHead().findNext();
-                                         }
-                                     }
+                                     tryDeliverStraightThrough(entry);
+                                     return null;
                                  }
-                                 // always do one extra loop after we believe 
we've finished
-                                 // this catches the case where we *just* miss 
an update
-                                 int loops = 2;
-
-                                 while (entry.isAvailable() && loops != 0)
-                                 {
-                                     if (nextNode == null)
-                                     {
-                                         loops--;
-                                         nextNode = _consumerList.getHead();
-                                     }
-                                     else
-                                     {
-                                         // if consumer at end, and active, 
offer
-                                         final QueueConsumer<?> sub = 
nextNode.getConsumer();
-                                         deliverToConsumer(sub, entry);
-
+                             }
+                            );
+            }
 
-                                     }
-                                     nextNode = nextNode.findNext();
+            if (entry.isAvailable())
+            {
+                checkConsumersNotAheadOfDelivery(entry);
 
-                                 }
+                if (exclusiveSub != null)
+                {
+                    deliverAsync(exclusiveSub);
+                }
+                else
+                {
+                    deliverAsync();
+                }
+            }
 
-                                 return null;
-                             }
-                         });
+            checkForNotification(entry.getMessage());
+        }
+        finally
+        {
+            if(action != null)
+            {
+                action.performAction(entry);
+            }
         }
 
+    }
 
-        if (entry.isAvailable())
+    /**
+     * iterate over consumers and if any is at the end of the queue and can 
deliver this message,
+     * then deliver the message
+     */
+    private void tryDeliverStraightThrough(final QueueEntry entry)
+    {
+        try
         {
-            checkConsumersNotAheadOfDelivery(entry);
-
-            if (exclusiveSub != null)
+            QueueConsumerList.ConsumerNode node = 
_consumerList.getMarkedNode();
+            QueueConsumerList.ConsumerNode nextNode = node.findNext();
+            if (nextNode == null)
             {
-                deliverAsync(exclusiveSub);
+                nextNode = _consumerList.getHead().findNext();
             }
-            else
+            while (nextNode != null)
             {
-                deliverAsync();
-           }
-        }
+                if (_consumerList.updateMarkedNode(node, nextNode))
+                {
+                    break;
+                }
+                else
+                {
+                    node = _consumerList.getMarkedNode();
+                    nextNode = node.findNext();
+                    if (nextNode == null)
+                    {
+                        nextNode = _consumerList.getHead().findNext();
+                    }
+                }
+            }
+            // always do one extra loop after we believe we've finished
+            // this catches the case where we *just* miss an update
+            int loops = 2;
 
-        checkForNotification(entry.getMessage());
+            while (entry.isAvailable() && loops != 0)
+            {
+                if (nextNode == null)
+                {
+                    loops--;
+                    nextNode = _consumerList.getHead();
+                }
+                else
+                {
+                    // if consumer at end, and active, offer
+                    final QueueConsumer<?> sub = nextNode.getConsumer();
+                    deliverToConsumer(sub, entry);
+
+
+                }
+                nextNode = nextNode.findNext();
 
-        if(action != null)
+            }
+        }
+        catch (ConnectionScopedRuntimeException | TransportException e)
         {
-            action.performAction(entry);
+            String errorMessage = "Suppressing " + 
e.getClass().getSimpleName() +
+                              " during straight through delivery, as this" +
+                              " can only indicate an issue with a consumer.";
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug(errorMessage, e);
+            }
+            else
+            {
+                _logger.info(errorMessage + ' ' + e.getMessage());
+            }
         }
     }
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 Tue Jul 22 14:30:57 2014
@@ -21,16 +21,13 @@
 package org.apache.qpid.server.queue;
 
 import java.security.PrivilegedAction;
-import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.*;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.TransportException;
 
@@ -79,7 +76,7 @@ public class QueueRunner implements Runn
                     {
                         runAgain = _queue.processQueue(QueueRunner.this);
                     }
-                    catch (final ConnectionScopedRuntimeException e)
+                    catch (ConnectionScopedRuntimeException | 
TransportException  e)
                     {
                         final String errorMessage = "Problem during 
asynchronous delivery by " + toString();
                         if(_logger.isDebugEnabled())
@@ -91,18 +88,6 @@ public class QueueRunner implements Runn
                             _logger.info(errorMessage + ' ' + e.getMessage());
                         }
                     }
-                    catch (final TransportException transe)
-                    {
-                        final String errorMessage = "Problem during 
asynchronous delivery by " + toString();
-                        if(_logger.isDebugEnabled())
-                        {
-                            _logger.debug(errorMessage, transe);
-                        }
-                        else
-                        {
-                            _logger.info(errorMessage + ' ' + 
transe.getMessage());
-                        }
-                    }
                     finally
                     {
                         _scheduled.compareAndSet(RUNNING, IDLE);

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
 Tue Jul 22 14:30:57 2014
@@ -24,12 +24,11 @@ package org.apache.qpid.server.queue;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.transport.TransportException;
 
 import javax.security.auth.Subject;
 import java.security.PrivilegedAction;
-import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -73,16 +72,16 @@ class SubFlushRunner implements Runnable
                     {
                         complete = getQueue().flushConsumer(_sub, ITERATIONS);
                     }
-                    catch (final TransportException transe)
+                    catch (ConnectionScopedRuntimeException | 
TransportException  e)
                     {
                         final String errorMessage = "Problem during 
asynchronous delivery by " + toString();
                         if(_logger.isDebugEnabled())
                         {
-                            _logger.debug(errorMessage, transe);
+                            _logger.debug(errorMessage, e);
                         }
                         else
                         {
-                            _logger.info(errorMessage + ' ' + 
transe.getMessage());
+                            _logger.info(errorMessage + ' ' + e.getMessage());
                         }
                     }
                     finally



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to