Author: kwall
Date: Tue Jul 22 14:30:57 2014
New Revision: 1612578
URL: http://svn.apache.org/r1612578
Log:
QPID-5912: [Java Broker] Prevent failure to send to a consumer on the straight
through delivery path from preventing the message being enqueued to the store.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Tue Jul 22 14:30:57 2014
@@ -82,11 +82,13 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
@@ -931,90 +933,115 @@ public abstract class AbstractQueue<X ex
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
- if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
+ try
{
- /*
- * iterate over consumers and if any is at the end of the queue and
can deliver this message,
- * then deliver the message
- */
-
- Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate
Delivery"),
- new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
+ if (action != null || (exclusiveSub == null &&
_queueRunner.isIdle()))
+ {
+ Subject.doAs(SecurityManager.getSystemTaskSubject("Immediate
Delivery"),
+ new PrivilegedAction<Void>()
{
-
- QueueConsumerList.ConsumerNode node =
_consumerList.getMarkedNode();
- QueueConsumerList.ConsumerNode nextNode =
node.findNext();
- if (nextNode == null)
- {
- nextNode =
_consumerList.getHead().findNext();
- }
- while (nextNode != null)
+ @Override
+ public Void run()
{
- if (_consumerList.updateMarkedNode(node,
nextNode))
- {
- break;
- }
- else
- {
- node = _consumerList.getMarkedNode();
- nextNode = node.findNext();
- if (nextNode == null)
- {
- nextNode =
_consumerList.getHead().findNext();
- }
- }
+ tryDeliverStraightThrough(entry);
+ return null;
}
- // always do one extra loop after we believe
we've finished
- // this catches the case where we *just* miss
an update
- int loops = 2;
-
- while (entry.isAvailable() && loops != 0)
- {
- if (nextNode == null)
- {
- loops--;
- nextNode = _consumerList.getHead();
- }
- else
- {
- // if consumer at end, and active,
offer
- final QueueConsumer<?> sub =
nextNode.getConsumer();
- deliverToConsumer(sub, entry);
-
+ }
+ );
+ }
- }
- nextNode = nextNode.findNext();
+ if (entry.isAvailable())
+ {
+ checkConsumersNotAheadOfDelivery(entry);
- }
+ if (exclusiveSub != null)
+ {
+ deliverAsync(exclusiveSub);
+ }
+ else
+ {
+ deliverAsync();
+ }
+ }
- return null;
- }
- });
+ checkForNotification(entry.getMessage());
+ }
+ finally
+ {
+ if(action != null)
+ {
+ action.performAction(entry);
+ }
}
+ }
- if (entry.isAvailable())
+ /**
+ * iterate over consumers and if any is at the end of the queue and can
deliver this message,
+ * then deliver the message
+ */
+ private void tryDeliverStraightThrough(final QueueEntry entry)
+ {
+ try
{
- checkConsumersNotAheadOfDelivery(entry);
-
- if (exclusiveSub != null)
+ QueueConsumerList.ConsumerNode node =
_consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
+ if (nextNode == null)
{
- deliverAsync(exclusiveSub);
+ nextNode = _consumerList.getHead().findNext();
}
- else
+ while (nextNode != null)
{
- deliverAsync();
- }
- }
+ if (_consumerList.updateMarkedNode(node, nextNode))
+ {
+ break;
+ }
+ else
+ {
+ node = _consumerList.getMarkedNode();
+ nextNode = node.findNext();
+ if (nextNode == null)
+ {
+ nextNode = _consumerList.getHead().findNext();
+ }
+ }
+ }
+ // always do one extra loop after we believe we've finished
+ // this catches the case where we *just* miss an update
+ int loops = 2;
- checkForNotification(entry.getMessage());
+ while (entry.isAvailable() && loops != 0)
+ {
+ if (nextNode == null)
+ {
+ loops--;
+ nextNode = _consumerList.getHead();
+ }
+ else
+ {
+ // if consumer at end, and active, offer
+ final QueueConsumer<?> sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
+
+
+ }
+ nextNode = nextNode.findNext();
- if(action != null)
+ }
+ }
+ catch (ConnectionScopedRuntimeException | TransportException e)
{
- action.performAction(entry);
+ String errorMessage = "Suppressing " +
e.getClass().getSimpleName() +
+ " during straight through delivery, as this" +
+ " can only indicate an issue with a consumer.";
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug(errorMessage, e);
+ }
+ else
+ {
+ _logger.info(errorMessage + ' ' + e.getMessage());
+ }
}
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Tue Jul 22 14:30:57 2014
@@ -21,16 +21,13 @@
package org.apache.qpid.server.queue;
import java.security.PrivilegedAction;
-import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.security.*;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
@@ -79,7 +76,7 @@ public class QueueRunner implements Runn
{
runAgain = _queue.processQueue(QueueRunner.this);
}
- catch (final ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException |
TransportException e)
{
final String errorMessage = "Problem during
asynchronous delivery by " + toString();
if(_logger.isDebugEnabled())
@@ -91,18 +88,6 @@ public class QueueRunner implements Runn
_logger.info(errorMessage + ' ' + e.getMessage());
}
}
- catch (final TransportException transe)
- {
- final String errorMessage = "Problem during
asynchronous delivery by " + toString();
- if(_logger.isDebugEnabled())
- {
- _logger.debug(errorMessage, transe);
- }
- else
- {
- _logger.info(errorMessage + ' ' +
transe.getMessage());
- }
- }
finally
{
_scheduled.compareAndSet(RUNNING, IDLE);
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1612578&r1=1612577&r2=1612578&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
Tue Jul 22 14:30:57 2014
@@ -24,12 +24,11 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.TaskPrincipal;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.TransportException;
import javax.security.auth.Subject;
import java.security.PrivilegedAction;
-import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,16 +72,16 @@ class SubFlushRunner implements Runnable
{
complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
- catch (final TransportException transe)
+ catch (ConnectionScopedRuntimeException |
TransportException e)
{
final String errorMessage = "Problem during
asynchronous delivery by " + toString();
if(_logger.isDebugEnabled())
{
- _logger.debug(errorMessage, transe);
+ _logger.debug(errorMessage, e);
}
else
{
- _logger.info(errorMessage + ' ' +
transe.getMessage());
+ _logger.info(errorMessage + ' ' + e.getMessage());
}
}
finally
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]