Repository: activemq Updated Branches: refs/heads/activemq-5.14.x f9e624a48 -> fd3853c24
https://issues.apache.org/jira/browse/AMQ-6498 Include the already received backlog when deciding to grant additional credit to avoid excessive backlogs of messages during producer flow control. (cherry picked from commit 7cf7fba7aab05ea9c41bf009325c63a6798f6cc8) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fd3853c2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fd3853c2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fd3853c2 Branch: refs/heads/activemq-5.14.x Commit: fd3853c24d26c8fee34e17fd682f1fe89dc00fdc Parents: f9e624a Author: Timothy Bish <[email protected]> Authored: Fri Nov 4 11:55:12 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Nov 4 11:56:00 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpReceiver.java | 93 ++++----- .../amqp/interop/AmqpFlowControlTest.java | 111 +++++++++++ .../amqp/profile/JmsSendReceiveStressTest.java | 187 +++++++++++++++++++ 3 files changed, 345 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fd3853c2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 33c319e..9f45ed2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -70,6 +70,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver { private InboundTransformer inboundTransformer; + private int sendsInFlight; + /** * Create a new instance of an AmqpReceiver * @@ -204,58 +206,57 @@ public class AmqpReceiver extends AmqpAbstractReceiver { } message.onSend(); - if (!delivery.remotelySettled()) { - sendToActiveMQ(message, new ResponseHandler() { - - @Override - public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { - if (response.isException()) { - ExceptionResponse error = (ExceptionResponse) response; - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - - if (error.getException() instanceof SecurityException) { - condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); - } else if (error.getException() instanceof ResourceAllocationException) { - condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); - } else { - condition.setCondition(Symbol.valueOf("failed")); - } - - condition.setDescription(error.getException().getMessage()); - rejected.setError(condition); - delivery.disposition(rejected); + + sendsInFlight++; + + sendToActiveMQ(message, createResponseHandler(delivery)); + } + } + + private ResponseHandler createResponseHandler(final Delivery delivery) { + return new ResponseHandler() { + + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + if (!delivery.remotelySettled()) { + if (response.isException()) { + ExceptionResponse error = (ExceptionResponse) response; + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + + if (error.getException() instanceof SecurityException) { + condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS); + } else if (error.getException() instanceof ResourceAllocationException) { + condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); } else { - if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) { - LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); - getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); - } - - if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId()); - - delivery.disposition(txAccepted); - } else { - delivery.disposition(Accepted.getInstance()); - } + condition.setCondition(Symbol.valueOf("failed")); } - delivery.settle(); - session.pumpProtonToSocket(); + condition.setDescription(error.getException().getMessage()); + rejected.setError(condition); + delivery.disposition(rejected); + } else { + final DeliveryState remoteState = delivery.getRemoteState(); + if (remoteState != null && remoteState instanceof TransactionalState) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId()); + + delivery.disposition(txAccepted); + } else { + delivery.disposition(Accepted.getInstance()); + } } - }); - } else { - if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .3)) { - LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); - getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); - session.pumpProtonToSocket(); + } + + if (getEndpoint().getCredit() + --sendsInFlight <= (getConfiguredReceiverCredit() * .3)) { + LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() * .7, getProducerId()); + getEndpoint().flow((int) (getConfiguredReceiverCredit() * .7)); } delivery.settle(); - sendToActiveMQ(message); + session.pumpProtonToSocket(); } - } + }; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fd3853c2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java new file mode 100644 index 0000000..3cbbd4b --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpFlowControlTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +public class AmqpFlowControlTest extends AmqpClientTestSupport { + + @Override + protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception { + // Setup a destination policy where it takes only 1 message at a time. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(1); + policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); + policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policy.setProducerFlowControl(true); + policyMap.setDefaultEntry(policy); + + brokerService.setDestinationPolicy(policyMap); + } + + @Test(timeout = 60000) + public void testCreditNotGrantedUntilBacklogClears() throws Exception { + final int MSG_COUNT = 1000; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + AmqpSender sender = session.createSender("queue://" + getTestName(), true); + + for (int i = 1; i <= MSG_COUNT; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + i); + sender.send(message); + + if (i % 1000 == 0) { + LOG.info("Sent message: {}", i); + } + } + + // Should only accept one message + final QueueViewMBean queue = getProxyToQueue(getTestName()); + assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queue.getQueueSize() == 1; + } + })); + + assertEquals(0, sender.getEndpoint().getRemoteCredit()); + + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + + // Should not grant any credit until backlog starts to clear + assertEquals(0, sender.getEndpoint().getRemoteCredit()); + + receiver.flow(MSG_COUNT - 1); + for (int i = 0; i < MSG_COUNT - 1; ++i) { + received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + } + + // Should have been granted credit once backlog was cleared. + assertTrue(sender.getEndpoint().getRemoteCredit() > 0); + + sender.close(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/fd3853c2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java new file mode 100644 index 0000000..69536ec --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/profile/JmsSendReceiveStressTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.profile; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.transport.amqp.JMSClientTestSupport; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Ignore("Use for profiling and memory testing") +public class JmsSendReceiveStressTest extends JMSClientTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveStressTest.class); + + public static final int PAYLOAD_SIZE = 64 * 1024; + + private final byte[] payload = new byte[PAYLOAD_SIZE]; + private final int parallelProducer = 1; + private final int parallelConsumer = 1; + private final Vector<Throwable> exceptions = new Vector<Throwable>(); + private JmsConnectionFactory factory; + + private final long NUM_SENDS = 1000000; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + for (int i = 0; i < PAYLOAD_SIZE; ++i) { + payload[i] = (byte) (i % 255); + } + } + + @Test + public void testProduceConsume() throws Exception { + factory = new JmsConnectionFactory(getAmqpURI(getAmqpConnectionURIOptions())); + factory.setForceAsyncAcks(true); + factory.setForceAsyncSend(false); + factory.setForceSyncSend(false); + + final AtomicLong sharedSendCount = new AtomicLong(NUM_SENDS); + final AtomicLong sharedReceiveCount = new AtomicLong(NUM_SENDS); + + Thread.sleep(2000); + + long start = System.currentTimeMillis(); + ExecutorService executorService = Executors.newFixedThreadPool(parallelConsumer + parallelProducer); + + for (int i = 0; i < parallelConsumer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + consumeMessages(sharedReceiveCount); + } catch (Throwable e) { + exceptions.add(e); + } + } + }); + } + for (int i = 0; i < parallelProducer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + publishMessages(sharedSendCount); + } catch (Throwable e) { + exceptions.add(e); + } + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.MINUTES); + assertTrue("Producers done in time", executorService.isTerminated()); + assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); + + double duration = System.currentTimeMillis() - start; + LOG.info("Duration: " + duration + "ms"); + LOG.info("Rate: " + (NUM_SENDS * 1000 / duration) + "m/s"); + } + + private void consumeMessages(AtomicLong count) throws Exception { + JmsConnection connection = (JmsConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + long v; + while ((v = count.decrementAndGet()) > 0) { + if ((count.get() % 10000) == 0) { + LOG.info("Received message: {}", NUM_SENDS - count.get()); + } + assertNotNull("got message " + v, consumer.receive(15000)); + } + LOG.info("Received message: {}", NUM_SENDS); + + consumer.close(); + } + + private void publishMessages(AtomicLong count) throws Exception { + JmsConnection connection = (JmsConnection) factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + while (count.getAndDecrement() > 0) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + producer.send(message); + if ((count.get() % 10000) == 0) { + LOG.info("Sent message: {}", NUM_SENDS - count.get()); + } + } + producer.close(); + connection.close(); + } + + @Override + protected void performAdditionalConfiguration(BrokerService brokerService) throws Exception { + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policyEntry.setPrioritizedMessages(false); + policyEntry.setExpireMessagesPeriod(0); + policyEntry.setEnableAudit(false); + policyEntry.setOptimizedDispatch(true); + policyEntry.setQueuePrefetch(1); // ensure no contention on add with + // matched producer/consumer + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + brokerService.setDestinationPolicy(policyMap); + } + + @Override + protected String getAmqpTransformer() { + return "jms"; + } + + private String getAmqpConnectionURIOptions() { + return "jms.presettlePolicy.presettleAll=false"; + } +}
