Repository: activemq-artemis Updated Branches: refs/heads/master 2ef0d2601 -> 0db33b78b
ARTEMIS-1073 Adding configuration for Producer's credits on AMQP Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc25ff0e Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc25ff0e Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc25ff0e Branch: refs/heads/master Commit: dc25ff0e42976b4fee507105cd64f0e9847a4ea4 Parents: 2ef0d26 Author: Clebert Suconic <[email protected]> Authored: Mon Mar 27 15:36:41 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Mon Mar 27 16:27:24 2017 -0400 ---------------------------------------------------------------------- .../artemis/cli/commands/etc/amqp-acceptor.txt | 2 +- .../artemis/cli/commands/etc/broker.xml | 5 +++- .../amqp/broker/ProtonProtocolManager.java | 24 +++++++++++++++- .../client/AMQPClientConnectionFactory.java | 2 +- .../amqp/proton/AMQPConnectionContext.java | 30 +++++++++++++++++++- .../amqp/proton/AMQPSessionContext.java | 2 +- .../proton/ProtonServerReceiverContext.java | 12 ++++---- .../transaction/ProtonTransactionHandler.java | 10 ++++--- 8 files changed, 72 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt index 5b20b92..71f44b7 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/amqp-acceptor.txt @@ -1,3 +1,3 @@ <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> - <acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true</acceptor> + <acceptor name="amqp">tcp://${host}:${amqp.port}?protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml index 5ca8687..497b10d 100644 --- a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml +++ b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml @@ -61,10 +61,13 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings} <global-max-size>100Mb</global-max-size> <acceptors> + <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> + <!-- amqpCredits: The number of credits sent to AMQP producers --> + <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> <!-- Acceptor for every supported protocol --> - <acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true</acceptor> + <acceptor name="artemis">tcp://${host}:${default.port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor> ${amqp-acceptor}${stomp-acceptor}${hornetq-acceptor}${mqtt-acceptor} </acceptors> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 2828cc1..03314b2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -56,6 +56,10 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); + private int amqpCredits = 100; + + private int amqpLowCredits = 30; + /* * used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for * the address. This can be changed on the acceptor. @@ -105,7 +109,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti } String id = server.getConfiguration().getName(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getExecutorFactory().getExecutor(), server.getScheduledPool()); Executor executor = server.getExecutorFactory().getExecutor(); @@ -137,6 +141,24 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti } + public int getAmqpCredits() { + return amqpCredits; + } + + public ProtonProtocolManager setAmqpCredits(int amqpCredits) { + this.amqpCredits = amqpCredits; + return this; + } + + public int getAmqpLowCredits() { + return amqpLowCredits; + } + + public ProtonProtocolManager setAmqpLowCredits(int amqpLowCredits) { + this.amqpLowCredits = amqpLowCredits; + return this; + } + @Override public boolean isProtocol(byte[] array) { return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P'; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java index b8851bb..510fdad 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java @@ -52,7 +52,7 @@ public class AMQPClientConnectionFactory { Executor executor = server.getExecutorFactory().getExecutor(); - AMQPConnectionContext amqpConnection = new AMQPConnectionContext(connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); + AMQPConnectionContext amqpConnection = new AMQPConnectionContext(null, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, executor, server.getScheduledPool()); eventHandler.ifPresent(amqpConnection::addEventHandler); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 1c38942..7994be4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; @@ -74,13 +75,18 @@ public class AMQPConnectionContext extends ProtonInitializable { protected LocalListener listener = new LocalListener(); - public AMQPConnectionContext(AMQPConnectionCallback connectionSP, + private final ProtonProtocolManager protocolManager; + + public AMQPConnectionContext(ProtonProtocolManager protocolManager, + AMQPConnectionCallback connectionSP, String containerId, int idleTimeout, int maxFrameSize, int channelMax, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) { + + this.protocolManager = protocolManager; this.connectionCallback = connectionSP; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); @@ -240,6 +246,28 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.addEventHandler(eventHandler); } + public ProtonProtocolManager getProtocolManager() { + return protocolManager; + } + + public int getAmqpLowCredits() { + if (protocolManager != null) { + return protocolManager.getAmqpLowCredits(); + } else { + // this is for tests only... + return 30; + } + } + + public int getAmqpCredits() { + if (protocolManager != null) { + return protocolManager.getAmqpCredits(); + } else { + // this is for tests only... + return 100; + } + } + // This listener will perform a bunch of things here class LocalListener implements EventHandler { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 64b2531..c2c1f2d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -149,7 +149,7 @@ public class AMQPSessionContext extends ProtonInitializable { receiver.setContext(transactionHandler); synchronized (connection.getLock()) { receiver.open(); - receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT); + receiver.flow(connection.getAmqpCredits()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 596e93a..76ad1ac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -53,10 +53,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. */ - private static int maxCreditAllocation = 100; + private final int amqpCredits; // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. - private static int minCreditRefresh = 30; + private final int minCreditRefresh; private TerminusExpiryPolicy expiryPolicy; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, @@ -67,11 +67,13 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements this.protonSession = protonSession; this.receiver = receiver; this.sessionSPI = sessionSPI; + this.amqpCredits = connection.getAmqpCredits(); + this.minCreditRefresh = connection.getAmqpLowCredits(); } @Override public void onFlow(int credits, boolean drain) { - flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation); + flow(Math.min(credits, amqpCredits), amqpCredits); } @Override @@ -119,7 +121,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } } - flow(maxCreditAllocation, minCreditRefresh); + flow(amqpCredits, minCreditRefresh); } private RoutingType getRoutingType(Symbol[] symbols) { @@ -173,7 +175,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); synchronized (connection.getLock()) { - flow(maxCreditAllocation, minCreditRefresh); + flow(amqpCredits, minCreditRefresh); } } catch (Exception e) { log.warn(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc25ff0e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 12498b0..a3dae25 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -42,8 +42,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); - public static final int DEFAULT_COORDINATOR_CREDIT = 100; - public static final int CREDIT_LOW_WATERMARK = 30; + private final int amqpCredit; + private final int amqpLowMark; final AMQPSessionCallback sessionSPI; final AMQPConnectionContext connection; @@ -51,6 +51,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; this.connection = connection; + this.amqpCredit = connection.getAmqpCredits(); + this.amqpLowMark = connection.getAmqpLowCredits(); } @Override @@ -68,8 +70,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { synchronized (connection.getLock()) { // Replenish coordinator receiver credit on exhaustion so sender can continue // transaction declare and discahrge operations. - if (receiver.getCredit() < CREDIT_LOW_WATERMARK) { - receiver.flow(DEFAULT_COORDINATOR_CREDIT); + if (receiver.getCredit() < amqpLowMark) { + receiver.flow(amqpCredit); } buffer = new byte[delivery.available()];
