Author: kwall
Date: Fri Dec 5 08:47:22 2014
New Revision: 1643197
URL: http://svn.apache.org/viewvc?rev=1643197&view=rev
Log:
QPID-6258: [Java Broker] Remove SubFlushRunner leaving QueueRunner solely
responsible for asynchronous message delivery
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Fri Dec 5 08:47:22 2014
@@ -244,6 +244,8 @@ public abstract class AbstractQueue<X ex
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue =
new ConcurrentLinkedQueue<>();
+ private final QueueRunner _queueRunner = new QueueRunner(this);
+
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl
virtualHost)
{
super(parentsMap(virtualHost), attributes);
@@ -745,7 +747,7 @@ public abstract class AbstractQueue<X ex
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
- deliverAsync(consumer);
+ deliverAsync();
return consumer;
@@ -1006,14 +1008,7 @@ public abstract class AbstractQueue<X ex
{
checkConsumersNotAheadOfDelivery(entry);
- if (exclusiveSub != null)
- {
- deliverAsync(exclusiveSub);
- }
- else
- {
- deliverAsync();
- }
+ deliverAsync();
}
checkForNotification(entry.getMessage());
@@ -1490,7 +1485,7 @@ public abstract class AbstractQueue<X ex
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync(sub);
+ deliverAsync();
}
}
@@ -1859,8 +1854,6 @@ public abstract class AbstractQueue<X ex
}
}
- private final QueueRunner _queueRunner = new QueueRunner(this);
-
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
@@ -1869,20 +1862,6 @@ public abstract class AbstractQueue<X ex
}
- public void deliverAsync(QueueConsumer<?> sub)
- {
- if(_exclusiveSubscriber == null)
- {
- deliverAsync();
- }
- else
- {
- SubFlushRunner flusher = sub.getRunner();
- flusher.execute();
- }
-
- }
-
void flushConsumer(QueueConsumer<?> sub)
{
@@ -2100,10 +2079,7 @@ public abstract class AbstractQueue<X ex
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * consumer (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to consumers unsuspending) which are
- * capable of accepting/delivering all messages then these messages would
- * otherwise remain on the queue.
+ * consumer (i.e. asynchronous delivery is required).
*
* processQueue should be running while there are messages on the queue AND
* there are consumers that can deliver them. If there are no
@@ -2412,7 +2388,7 @@ public abstract class AbstractQueue<X ex
public void stateChanged(MessageInstance entry, QueueEntry.State
oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync(_sub);
+ deliverAsync();
}
}
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
Fri Dec 5 08:47:22 2014
@@ -44,8 +44,6 @@ public interface QueueConsumer<X extends
void queueDeleted();
- SubFlushRunner getRunner();
-
AMQQueue getQueue();
boolean resend(QueueEntry e);
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
Fri Dec 5 08:47:22 2014
@@ -86,7 +86,6 @@ class QueueConsumerImpl
}
private final ConsumerTarget _target;
- private final SubFlushRunner _runner = new SubFlushRunner(this);
private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
_listener;
private volatile QueueContext _queueContext;
@@ -210,7 +209,7 @@ class QueueConsumerImpl
@Override
public void externalStateChange()
{
- _queue.deliverAsync(this);
+ _queue.deliverAsync();
}
@Override
@@ -324,11 +323,6 @@ class QueueConsumerImpl
return getQueue().resend(entry, this);
}
- public final SubFlushRunner getRunner()
- {
- return _runner;
- }
-
public final long getConsumerNumber()
{
return _consumerNumber;
Modified:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
Fri Dec 5 08:47:22 2014
@@ -261,7 +261,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -274,7 +274,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
4,
@@ -311,7 +311,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction);
int subFlushWaitTime = 150;
- Thread.sleep(subFlushWaitTime); // Work done by
SubFlushRunner/QueueRunner Threads
+ Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
1,
@@ -322,7 +322,7 @@ abstract class AbstractQueueTestBase ext
Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
queueEntries.get(0).release();
- Thread.sleep(subFlushWaitTime); // Work done by
SubFlushRunner/QueueRunner Threads
+ Thread.sleep(subFlushWaitTime); // Work done by QueueRunner Thread
assertTrue("Expecting the queue entry to be now expired",
queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed",
@@ -360,7 +360,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
@@ -374,7 +374,7 @@ abstract class AbstractQueueTestBase ext
queueEntries.get(2).release();
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
5,
@@ -417,7 +417,7 @@ abstract class AbstractQueueTestBase ext
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both after
enqueue",
2,
@@ -426,7 +426,7 @@ abstract class AbstractQueueTestBase ext
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
- Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
+ Thread.sleep(150); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both
consumers after release",
3,
@@ -645,88 +645,6 @@ abstract class AbstractQueueTestBase ext
assertEquals("Message ID was wrong", msgID, 10L);
}
-
- /**
- * processQueue() is used when asynchronously delivering messages to
- * consumers which could not be delivered immediately during the
- * enqueue() operation.
- *
- * A defect within the method would mean that delivery of these messages
may
- * not occur should the Runner stop before all messages have been
processed.
- * Such a defect was discovered when Selectors were used such that one and
- * only one consumer can/will accept any given messages, but multiple
- * consumers are present, and one of the earlier consumers receives
- * more messages than the others.
- *
- * This test is to validate that the processQueue() method is able to
- * correctly deliver all of the messages present for asynchronous delivery
- * to consumers in such a scenario.
- */
- public void testProcessQueueWithUniqueSelectors() throws Exception
- {
- AbstractQueue testQueue = createNonAsyncDeliverQueue();
- testQueue.open();
-
- // retrieve the QueueEntryList the queue creates and insert the test
- // messages, thus avoiding straight-through delivery attempts during
- //enqueue() process.
- QueueEntryList list = testQueue.getEntries();
- assertNotNull("QueueEntryList should have been created", list);
-
- QueueEntry msg1 = list.add(createMessage(1L));
- QueueEntry msg2 = list.add(createMessage(2L));
- QueueEntry msg3 = list.add(createMessage(3L));
- QueueEntry msg4 = list.add(createMessage(4L));
- QueueEntry msg5 = list.add(createMessage(5L));
-
- // Create lists of the entries each consumer should be interested
- // in.Bias over 50% of the messages to the first consumer so that
- // the later consumers reject them and report being done before
- // the first consumer as the processQueue method proceeds.
- List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
- List<String> msgListSub2 = createEntriesList(msg4);
- List<String> msgListSub3 = createEntriesList(msg5);
-
- MockConsumer sub1 = new MockConsumer(msgListSub1);
- MockConsumer sub2 = new MockConsumer(msgListSub2);
- MockConsumer sub3 = new MockConsumer(msgListSub3);
-
- // register the consumers
- testQueue.addConsumer(sub1, sub1.getFilters(),
msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
- testQueue.addConsumer(sub2, sub2.getFilters(),
msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
- testQueue.addConsumer(sub3, sub3.getFilters(),
msg1.getMessage().getClass(), "test",
- EnumSet.of(ConsumerImpl.Option.ACQUIRES,
ConsumerImpl.Option.SEES_REQUEUES));
-
- //check that no messages have been delivered to the
- //consumers during registration
- assertEquals("No messages should have been delivered yet", 0,
sub1.getMessages().size());
- assertEquals("No messages should have been delivered yet", 0,
sub2.getMessages().size());
- assertEquals("No messages should have been delivered yet", 0,
sub3.getMessages().size());
-
- // call processQueue to deliver the messages
- testQueue.processQueue(new QueueRunner(testQueue)
- {
- @Override
- public void run()
- {
- // we don't actually want/need this runner to do any work
- // because we we are already doing it!
- }
- });
-
- // check expected messages delivered to correct consumers
- verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3),
sub1.getMessages());
-
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4),
sub2.getMessages());
-
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5),
sub3.getMessages());
- }
-
- private AbstractQueue createNonAsyncDeliverQueue()
- {
- return new NonAsyncDeliverQueue(getVirtualHost());
- }
-
/**
* Tests that dequeued message is not present in the list returned form
* {@link AbstractQueue#getMessagesOnTheQueue()}
@@ -1055,16 +973,6 @@ abstract class AbstractQueueTestBase ext
return entry;
}
- private List<String> createEntriesList(QueueEntry... entries)
- {
- ArrayList<String> entriesList = new ArrayList<String>();
- for (QueueEntry entry : entries)
- {
-
entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
- }
- return entriesList;
- }
-
protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
@@ -1210,90 +1118,4 @@ abstract class AbstractQueueTestBase ext
return _consumerTarget;
}
- private static class NonAsyncDeliverEntry extends OrderedQueueEntry
- {
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
- {
- super(queueEntryList);
- }
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
- final ServerMessage message,
- final long entryId)
- {
- super(queueEntryList, message, entryId);
- }
-
- public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
final ServerMessage message)
- {
- super(queueEntryList, message);
- }
- }
-
- private static class NonAsyncDeliverList extends OrderedQueueEntryList
- {
-
- private static final HeadCreator HEAD_CREATOR =
- new HeadCreator()
- {
-
- @Override
- public NonAsyncDeliverEntry createHead(final
QueueEntryList list)
- {
- return new NonAsyncDeliverEntry((NonAsyncDeliverList)
list);
- }
- };
-
- public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
- {
- super(queue, HEAD_CREATOR);
- }
-
- @Override
- protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?>
message)
- {
- return new NonAsyncDeliverEntry(this,message);
- }
- }
-
-
- private static class NonAsyncDeliverQueue extends
AbstractQueue<NonAsyncDeliverQueue>
- {
- private QueueEntryList _entries = new NonAsyncDeliverList(this);
-
- public NonAsyncDeliverQueue(VirtualHostImpl vhost)
- {
- super(attributes(), vhost);
- }
-
- @Override
- protected void onOpen()
- {
- super.onOpen();
- }
-
- @Override
- QueueEntryList getEntries()
- {
- return _entries;
- }
-
- private static Map<String,Object> attributes()
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, "test");
- attributes.put(Queue.DURABLE, false);
- attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
- return attributes;
- }
-
- @Override
- public void deliverAsync(QueueConsumer<?> sub)
- {
- // do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new consumers
- }
- }
}
Modified:
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
Fri Dec 5 08:47:22 2014
@@ -180,14 +180,7 @@ public class StandardQueueTest extends A
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
- StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes,
getVirtualHost())
- {
- @Override
- public void deliverAsync(QueueConsumer sub)
- {
- // do nothing
- }
- };
+ StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes,
getVirtualHost());
testQueue.create();
// put messages
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1643197&r1=1643196&r2=1643197&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Fri Dec 5 08:47:22 2014
@@ -307,7 +307,7 @@ public class ServerConnectionDelegate ex
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
// To ensure a clean detach, we stop any remaining subscriptions. Stop
ensures
- // that any in-progress delivery (SubFlushRunner/QueueRunner) is
completed before the stop
+ // that any in-progress delivery (QueueRunner) is completed before the
stop
// completes.
stopAllSubscriptions(conn, dtc);
Session ssn = conn.getSession(dtc.getChannel());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]