Author: rgodfrey
Date: Mon Jan 2 10:01:21 2012
New Revision: 1226382
URL: http://svn.apache.org/viewvc?rev=1226382&view=rev
Log:
QPID-3713 : Implement producer side flow control for 0-10 in Java Broker
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
qpid/trunk/qpid/java/test-profiles/Java010Excludes
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
Mon Jan 2 10:01:21 2012
@@ -22,6 +22,9 @@ package org.apache.qpid.server.logging.s
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.transport.ServerSession;
+
import static
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
public class ChannelLogSubject extends AbstractLogSubject
@@ -52,5 +55,30 @@ public class ChannelLogSubject extends A
session.getVirtualHost().getName(),
channel.getChannelId());
}
-
+
+ public ChannelLogSubject(ServerSession session)
+ {
+ /**
+ * LOG FORMAT used by the AMQPConnectorActor follows
+ * ChannelLogSubject.CHANNEL_FORMAT :
+ * con:{0}({1}@{2}/{3})/ch:{4}
+ *
+ * Uses a MessageFormat call to insert the required values according to
+ * these indices:
+ *
+ * 0 - Connection ID
+ * 1 - User ID
+ * 2 - IP
+ * 3 - Virtualhost
+ * 4 - Channel ID
+ */
+ ServerConnection connection = (ServerConnection)
session.getConnection();
+ setLogStringWithFormat(CHANNEL_FORMAT,
+ connection == null ? -1L :
connection.getConnectionId(),
+ session.getAuthorizedPrincipal() == null ? "?"
: session.getAuthorizedPrincipal().getName(),
+ (connection == null || connection.getConfig()
== null) ? "?" : connection.getConfig().getAddress(),
+ session.getVirtualHost().getName(),
+ session.getChannel());
+ }
+
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Mon Jan 2 10:01:21 2012
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
public interface AMQSessionModel
{
@@ -51,4 +53,8 @@ public interface AMQSessionModel
* @param idleClose time in milliseconds before closing connection with
idle transaction
*/
public void checkTransactionStatus(long openWarn, long openClose, long
idleWarn, long idleClose) throws AMQException;
+
+ void block(AMQQueue queue);
+
+ void unblock(AMQQueue queue);
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Mon Jan 2 10:01:21 2012
@@ -217,7 +217,7 @@ public interface AMQQueue extends Managa
Map<String, Object> getArguments();
- void checkCapacity(AMQChannel channel);
+ void checkCapacity(AMQSessionModel channel);
/**
* ExistingExclusiveSubscription signals a failure to create a
subscription, because an exclusive subscription
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Mon Jan 2 10:01:21 2012
@@ -164,7 +164,7 @@ public class SimpleAMQQueue implements A
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new
ConcurrentHashMap<AMQChannel, Boolean>();
+ private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels =
new ConcurrentHashMap<AMQSessionModel, Boolean>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final List<Task> _deleteTaskList = new
CopyOnWriteArrayList<Task>();
@@ -1528,7 +1528,7 @@ public class SimpleAMQQueue implements A
}
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
if(_capacity != 0l)
{
@@ -1538,10 +1538,9 @@ public class SimpleAMQQueue implements A
//Overfull log message
_logActor.message(_logSubject,
QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
- if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
- {
- channel.block(this);
- }
+ _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+ channel.block(this);
if(_atomicQueueSize.get() <= _flowResumeCapacity)
{
@@ -1573,7 +1572,7 @@ public class SimpleAMQQueue implements A
}
- for(AMQChannel c : _blockedChannels.keySet())
+ for(AMQSessionModel c : _blockedChannels.keySet())
{
c.unblock(this);
_blockedChannels.remove(c);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Mon Jan 2 10:01:21 2012
@@ -25,7 +25,6 @@ import static org.apache.qpid.util.Seria
import java.security.Principal;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -33,8 +32,11 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
@@ -45,11 +47,13 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.ConnectionConfig;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -66,6 +70,11 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
@@ -81,6 +90,7 @@ public class ServerSession extends Sessi
private static final Logger _logger =
LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION =
UUID.randomUUID().toString();
+ private static final int HALF_INCOMING_CREDIT_THRESHOLD = 1 << 30;
private final UUID _id;
private ConnectionConfig _connectionConfig;
@@ -88,6 +98,16 @@ public class ServerSession extends Sessi
private LogActor _actor = GenericActor.getInstance(this);
private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new
ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final ConcurrentMap<Exchange, Boolean> _blockingExchanges = new
ConcurrentHashMap<Exchange, Boolean>();
+
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+ private ChannelLogSubject _logSubject;
+ private final AtomicInteger _oustandingCredit = new
AtomicInteger(Integer.MAX_VALUE);
+
+
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -132,7 +152,7 @@ public class ServerSession extends Sessi
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
-
+ _logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,6 +181,10 @@ public class ServerSession extends Sessi
public void enqueue(final ServerMessage message, final List<? extends
BaseQueue> queues)
{
+ if(_oustandingCredit.decrementAndGet() <
HALF_INCOMING_CREDIT_THRESHOLD)
+ {
+ invoke(new
MessageFlow("",MessageCreditUnit.MESSAGE,HALF_INCOMING_CREDIT_THRESHOLD));
+ }
getConnectionModel().registerMessageReceived(message.getSize(),
message.getArrivalTime());
PostEnqueueAction postTransactionAction;
if(isTransactional())
@@ -661,6 +685,43 @@ public class ServerSession extends Sessi
}
}
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ _actor.message(_logSubject,
ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
+ }
+
+
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _oustandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+
+
+ }
+ }
+ }
+
+
public String toLogString()
{
return "[" +
@@ -701,7 +762,7 @@ public class ServerSession extends Sessi
}
}
- private static class PostEnqueueAction implements ServerTransaction.Action
+ private class PostEnqueueAction implements ServerTransaction.Action
{
private List<? extends BaseQueue> _queues;
@@ -732,7 +793,13 @@ public class ServerSession extends Sessi
{
try
{
- _queues.get(i).enqueue(_message, _transactional, null);
+ BaseQueue queue = _queues.get(i);
+ queue.enqueue(_message, _transactional, null);
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(ServerSession.this);
+ }
+
}
catch (AMQException e)
{
@@ -756,6 +823,6 @@ public class ServerSession extends Sessi
public boolean getBlocking()
{
- return false; //TODO: Blocking not implemented on 0-10 yet.
+ return _blocking.get();
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Mon Jan 2 10:01:21 2012
@@ -30,7 +30,6 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -477,7 +476,7 @@ public class MockAMQQueue implements AMQ
return null;
}
- public void checkCapacity(AMQChannel channel)
+ public void checkCapacity(AMQSessionModel channel)
{
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Range.java
Mon Jan 2 10:01:21 2012
@@ -185,6 +185,12 @@ public abstract class Range implements R
}
}
+ public String toString()
+ {
+ return "[" + point + ", " + point + "]";
+ }
+
+
}
private static class RangeImpl extends Range
@@ -283,7 +289,7 @@ public abstract class Range implements R
return range;
}
- @Override
+
public void remove()
{
throw new UnsupportedOperationException();
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Mon Jan 2 10:01:21 2012
@@ -61,7 +61,7 @@ import java.util.concurrent.atomic.Atomi
public class Session extends SessionInvoker
{
private static final Logger log = Logger.get(Session.class);
-
+
public enum State { NEW, DETACHED, RESUMING, OPEN, CLOSING, CLOSED }
static class DefaultSessionListener implements SessionListener
@@ -96,6 +96,9 @@ public class Session extends SessionInvo
private final long timeout =
Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
+ private final long blockedSendTimeout =
Long.getLong("qpid.flow_control_wait_failure", timeout);
+ private long blockedSendReportingPeriod =
Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
private boolean autoSync = false;
private boolean incomingInit;
@@ -228,10 +231,21 @@ public class Session extends SessionInvo
{
try
{
- if (!credit.tryAcquire(timeout, TimeUnit.MILLISECONDS))
+ long wait = blockedSendTimeout > blockedSendReportingPeriod ?
blockedSendReportingPeriod :
+ blockedSendTimeout;
+ long totalWait = 1L;
+ while(totalWait <= blockedSendTimeout &&
!credit.tryAcquire(wait, TimeUnit.MILLISECONDS))
+ {
+ totalWait+=wait;
+ log.warn("Message send delayed by " + (totalWait)/1000 +
"s due to broker enforced flow control");
+
+
+ }
+ if(totalWait > blockedSendTimeout)
{
+ log.error("Message send failed due to timeout waiting on
broker enforced flow control");
throw new SessionException
- ("timed out waiting for message credit");
+ ("timed out waiting for message credit");
}
}
catch (InterruptedException e)
@@ -815,7 +829,7 @@ public class Session extends SessionInvo
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
checkFailoverRequired("Session sync was interrupted by
failover.");
- log.debug("%s waiting for[%d]: %d, %s", this, point,
maxComplete, commands);
+ log.debug("%s waiting for[%d]: %d, %s", this, point,
maxComplete, Arrays.asList(commands));
w.await();
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
Mon Jan 2 10:01:21 2012
@@ -154,8 +154,7 @@ public class ProducerFlowControlTest ext
// try to send 5 messages (should block after 4)
sendMessagesAsync(producer, producerSession, 5, 50L);
- Thread.sleep(5000);
- List<String> results = waitAndFindMatches("QUE-1003");
+ List<String> results = waitAndFindMatches("QUE-1003", 7000);
assertEquals("Did not find correct number of QUE-1003 queue overfull
messages", 1, results.size());
@@ -199,11 +198,13 @@ public class ProducerFlowControlTest ext
// try to send 5 messages (should block after 4)
MessageSender sender = sendMessagesAsync(producer, producerSession, 5,
50L);
- Thread.sleep(TIMEOUT);
List<String> results = waitAndFindMatches("Message send delayed by",
TIMEOUT);
assertTrue("No delay messages logged by client",results.size()!=0);
- results = findMatches("Message send failed due to timeout waiting on
broker enforced flow control");
- assertEquals("Incorrect number of send failure messages logged by
client",1,results.size());
+
+ List<String> failedMessages = waitAndFindMatches("Message send failed
due to timeout waiting on broker enforced"
+ + " flow control", TIMEOUT);
+ assertEquals("Incorrect number of send failure messages logged by
client (got " + results.size() + " delay "
+ + "messages)",1,failedMessages.size());
@@ -325,8 +326,9 @@ public class ProducerFlowControlTest ext
// try to send 5 messages (should block after 4)
- MessageSender sender = sendMessagesAsync(producer, producerSession, 5,
50L);
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5,
100L);
+
Thread.sleep(10000);
Exception e = sender.getException();
@@ -440,6 +442,15 @@ public class ProducerFlowControlTest ext
e.printStackTrace();
throw new RuntimeException(e);
}
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1226382&r1=1226381&r2=1226382&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Mon Jan 2 10:01:21 2012
@@ -45,9 +45,6 @@ org.apache.qpid.server.logging.Subscript
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
-//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the
Java Broker (not in CPP Broker)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
-
//QPID-1864: rollback with subscriptions does not work in 0-10 yet
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]