Author: kwall
Date: Fri Aug  7 12:21:06 2015
New Revision: 1694667

URL: http://svn.apache.org/r1694667
Log:
QPID-6670: [Java Broker] Ensure that if an exception occurs during 
AbstractQueue#deleteAndReturnCount, the returned future will fail too

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1694667&r1=1694666&r2=1694667&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Fri Aug  7 12:21:06 2015
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -1889,42 +1890,56 @@ public abstract class AbstractQueue<X ex
 
             ListenableFuture<List<Void>> combinedFuture = 
Futures.allAsList(removeBindingFutures);
 
-            doAfter(combinedFuture, new Runnable()
+            Futures.addCallback(combinedFuture, new 
FutureCallback<List<Void>>()
             {
                 @Override
-                public void run()
+                public void onSuccess(final List<Void> result)
                 {
-                    QueueConsumerList.ConsumerNodeIterator 
consumerNodeIterator = _consumerList.iterator();
-
-                    while (consumerNodeIterator.advance())
+                    try
                     {
-                        QueueConsumer s = 
consumerNodeIterator.getNode().getConsumer();
-                        if (s != null)
+                        final QueueConsumerList.ConsumerNodeIterator 
consumerNodeIterator = _consumerList.iterator();
+
+                        while (consumerNodeIterator.advance())
                         {
-                            s.queueDeleted();
+                            final QueueConsumer s = 
consumerNodeIterator.getNode().getConsumer();
+                            if (s != null)
+                            {
+                                s.queueDeleted();
+                            }
                         }
-                    }
 
-                    List<QueueEntry> entries = getMessagesOnTheQueue(new 
AcquireAllQueueEntryFilter());
+                        final List<QueueEntry> entries = 
getMessagesOnTheQueue(new AcquireAllQueueEntryFilter());
 
-                    routeToAlternate(entries);
+                        routeToAlternate(entries);
 
-                    preSetAlternateExchange();
+                        preSetAlternateExchange();
 
-                    performQueueDeleteTasks();
-                    deleted();
+                        performQueueDeleteTasks();
+                        deleted();
 
-                    //Log Queue Deletion
-                    getEventLogger().message(_logSubject, 
QueueMessages.DELETED());
+                        //Log Queue Deletion
+                        getEventLogger().message(_logSubject, 
QueueMessages.DELETED());
 
-                    _deleteFuture.set(queueDepthMessages);
+                        _deleteFuture.set(queueDepthMessages);
+                    }
+                    catch(Throwable e)
+                    {
+                        _deleteFuture.setException(e);
+                    }
                 }
-            });
+
+                @Override
+                public void onFailure(final Throwable t)
+                {
+                    _deleteFuture.setException(t);
+                }
+            }, getTaskExecutor().getExecutor());
+
         }
         return _deleteFuture;
     }
 
-    protected void routeToAlternate(List<QueueEntry> entries)
+    private void routeToAlternate(List<QueueEntry> entries)
     {
         ServerTransaction txn = new 
LocalTransaction(getVirtualHost().getMessageStore());
 
@@ -1942,7 +1957,7 @@ public abstract class AbstractQueue<X ex
         txn.commit();
     }
 
-    protected void performQueueDeleteTasks()
+    private void performQueueDeleteTasks()
     {
         for (Action<? super AMQQueue> task : _deleteTaskList)
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to