Repository: activemq-artemis Updated Branches: refs/heads/master d871dfe62 -> 410cd91f6
ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f721866 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f721866 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f721866 Branch: refs/heads/master Commit: 2f721866ab982d56c488ed124cc191cf5f627e42 Parents: 06fb4a1 Author: Martyn Taylor <[email protected]> Authored: Wed Jul 27 13:36:08 2016 +0100 Committer: Martyn Taylor <[email protected]> Committed: Fri Aug 5 15:29:01 2016 +0100 ---------------------------------------------------------------------- .../plug/ProtonSessionIntegrationCallback.java | 30 +++++++--- .../plug/context/ProtonTransactionHandler.java | 29 ++++++---- .../artemis/core/paging/PagingStore.java | 2 + .../core/paging/impl/PagingStoreImpl.java | 12 ++++ .../core/settings/impl/AddressSettings.java | 35 +++++++++++- .../resources/schema/artemis-configuration.xsd | 10 +++- .../core/settings/AddressSettingsTest.java | 5 ++ docs/user-manual/en/flow-control.md | 38 ++++++------- .../transport/amqp/client/AmqpSession.java | 2 +- .../amqp/client/AmqpTransactionContext.java | 2 +- .../tests/integration/proton/ProtonTest.java | 60 +++++++++++++++++++- .../storage/PersistMultiThreadTest.java | 5 ++ 12 files changed, 187 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index b00474d..a00af71 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; @@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.Connection; @@ -56,6 +56,7 @@ import org.proton.plug.AMQPSessionCallback; import org.proton.plug.AMQPSessionContext; import org.proton.plug.SASLResult; import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.proton.plug.sasl.PlainSASLResult; public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback { @@ -351,18 +352,33 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se recoverContext(); PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress()); - if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) { - ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress()); - Rejected rejected = new Rejected(); - rejected.setError(ec); - delivery.disposition(rejected); - connection.flush(); + if (store.isRejectingMessages()) { + // We drop pre-settled messages (and abort any associated Tx) + if (delivery.remotelySettled()) { + if (serverSession.getCurrentTransaction() != null) { + String amqpAddress = delivery.getLink().getTarget().getAddress(); + ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress); + serverSession.getCurrentTransaction().markAsRollbackOnly(e); + } + } + else { + rejectMessage(delivery); + } } else { serverSend(message, delivery, receiver); } } + private void rejectMessage(Delivery delivery) { + String address = delivery.getLink().getTarget().getAddress(); + ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address); + Rejected rejected = new Rejected(); + rejected.setError(ec); + delivery.disposition(rejected); + connection.flush(); + } + private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception { try { serverSession.send(message, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java index dbf6f38..c8fb994 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java @@ -91,40 +91,49 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { try { sessionSPI.commitCurrentTX(); } + catch (ActiveMQAMQPException amqpE) { + throw amqpE; + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); } } - delivery.settle(); } } + catch (ActiveMQAMQPException amqpE) { + delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); + } catch (Exception e) { log.warn(e.getMessage(), e); - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.valueOf("failed")); - condition.setDescription(e.getMessage()); - rejected.setError(condition); - delivery.disposition(rejected); + delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); } finally { + delivery.settle(); buffer.release(); } } + private Rejected createRejected(Symbol amqpError, String message) { + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(amqpError); + condition.setDescription(message); + rejected.setError(condition); + return rejected; + } + @Override public void onFlow(int credits, boolean drain) { - } @Override public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { - //noop + // no op } @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { - //noop + // no op } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 566b91a..79fb115 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -128,6 +128,8 @@ public interface PagingStore extends ActiveMQComponent { boolean isFull(); + boolean isRejectingMessages(); + /** * Write lock the PagingStore. * http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index f57f1b8..7e6cda8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -123,6 +123,8 @@ public class PagingStoreImpl implements PagingStore { private volatile AtomicBoolean blocking = new AtomicBoolean(false); + private long rejectThreshold; + public PagingStoreImpl(final SimpleString address, final ScheduledExecutorService scheduledExecutor, final long syncTimeout, @@ -187,6 +189,8 @@ public class PagingStoreImpl implements PagingStore { addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy(); + rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold(); + if (cursorProvider != null) { cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize()); } @@ -1073,6 +1077,14 @@ public class PagingStoreImpl implements PagingStore { } @Override + public boolean isRejectingMessages() { + if (addressFullMessagePolicy != AddressFullMessagePolicy.BLOCK) { + return false; + } + return rejectThreshold != AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD && getAddressSize() > rejectThreshold; + } + + @Override public Collection<Integer> getCurrentIds() throws Exception { List<Integer> ids = new ArrayList<>(); if (fileFactory != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 642574b..f5f00f7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -76,6 +76,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable public static final int DEFAULT_QUEUE_PREFETCH = 1000; + // Default address drop threshold, applied to address settings with BLOCK policy. -1 means no threshold enabled. + public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1; + private AddressFullMessagePolicy addressFullMessagePolicy = null; private Long maxSizeBytes = null; @@ -124,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE; + private Long maxSizeBytesRejectThreshold = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -154,6 +159,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable this.autoDeleteJmsTopics = other.autoDeleteJmsTopics; this.managementBrowsePageSize = other.managementBrowsePageSize; this.queuePrefetch = other.queuePrefetch; + this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold; } public AddressSettings() { @@ -377,6 +383,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable return this; } + public long getMaxSizeBytesRejectThreshold() { + return (maxSizeBytesRejectThreshold == null) ? AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD : maxSizeBytesRejectThreshold; + } + + public AddressSettings setMaxSizeBytesRejectThreshold(long maxSizeBytesRejectThreshold) { + this.maxSizeBytesRejectThreshold = maxSizeBytesRejectThreshold; + return this; + } + /** * merge 2 objects in to 1 * @@ -456,6 +471,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable if (queuePrefetch == null) { queuePrefetch = merged.queuePrefetch; } + if (maxSizeBytesRejectThreshold == null) { + maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold; + } } @Override @@ -521,6 +539,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer); managementBrowsePageSize = BufferHelper.readNullableInteger(buffer); + + maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer); } @Override @@ -549,7 +569,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) + BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) + BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) + - BufferHelper.sizeOfNullableInteger(managementBrowsePageSize); + BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) + + BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold); } @Override @@ -601,6 +622,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics); BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize); + + BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold); } /* (non-Javadoc) @@ -635,6 +658,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode()); result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode()); result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode()); + result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode()); return result; } @@ -802,6 +826,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable } else if (!queuePrefetch.equals(other.queuePrefetch)) return false; + + if (maxSizeBytesRejectThreshold == null) { + if (other.maxSizeBytesRejectThreshold != null) + return false; + } + else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold)) + return false; return true; } @@ -825,6 +856,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable maxDeliveryAttempts + ", maxSizeBytes=" + maxSizeBytes + + ", maxSizeBytesRejectThreshold=" + + maxSizeBytesRejectThreshold + ", messageCounterHistoryDayLimit=" + messageCounterHistoryDayLimit + ", pageSizeBytes=" + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 5ac86a0..815ef7c 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2220,7 +2220,15 @@ <xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> - the maximum size (in bytes) to use in paging for an address (-1 means no limits) + the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies. + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit). </xsd:documentation> </xsd:annotation> </xsd:element> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java index 58f7c99..202f2ba 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java @@ -59,6 +59,8 @@ public class AddressSettingsTest extends ActiveMQTestBase { addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002); addressSettingsToMerge.setRedeliveryDelay(1003); addressSettingsToMerge.setPageSizeBytes(1004); + addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024); + addressSettings.merge(addressSettingsToMerge); Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ); Assert.assertEquals(addressSettings.getExpiryAddress(), exp); @@ -68,6 +70,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003); Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004); Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy()); + Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024); } @Test @@ -82,6 +85,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { addressSettingsToMerge.setMaxSizeBytes(1001); addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002); addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP); + addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024); addressSettings.merge(addressSettingsToMerge); AddressSettings addressSettingsToMerge2 = new AddressSettings(); @@ -100,6 +104,7 @@ public class AddressSettingsTest extends ActiveMQTestBase { Assert.assertEquals(addressSettings.getRedeliveryDelay(), 2003); Assert.assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001); Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy()); + Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024); } @Test http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/docs/user-manual/en/flow-control.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md index c1b4035..8a11966 100644 --- a/docs/user-manual/en/flow-control.md +++ b/docs/user-manual/en/flow-control.md @@ -275,25 +275,25 @@ control. #### Blocking producer window based flow control using AMQP -Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support -flow control. Artemis CORE protocol and AMQP. Both protocols implement flow -control slightly differently and therefore address full BLOCK policy behaves -slightly different for clients uses each protocol respectively. - -As explained earlier in this chapter the CORE protocol uses a producer window size -flow control system. Where credits (representing bytes) are allocated to producers, -if a producer wants to send a message it should wait until it has enough bytes available -to send it. AMQP flow control credits are not representative of bytes but instead represent -the number of messages a producer is permitted to send (regardless of size). - -BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis -will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30. -The broker will stop issuing credits once an address is full. However, since AMQP credits represent -whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an -address upper bound should the broker continue accepting messages until the clients credits are exhausted. -For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis -will start rejecting messages until the address becomes unblocked. This should be taken into consideration when writing -application code. +Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control. Artemis CORE protocol and +AMQP. Both protocols implement flow control slightly differently and therefore address full BLOCK policy behaves slightly +different for clients that use each protocol respectively. + +As explained earlier in this chapter the CORE protocol uses a producer window size flow control system. Where credits +(representing bytes) are allocated to producers, if a producer wants to send a message it should wait until it has +enough byte credits available for it to send. AMQP flow control credits are not representative of bytes but instead +represent the number of messages a producer is permitted to send (regardless of the message size). + +BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis will issue 100 credits +to a client at a time and refresh them when the clients credits reaches 30. The broker will stop issuing credits once an +address is full. However, since AMQP credits represent whole messages and not bytes, it would be possible in some +scenarios for an AMQP client to significantly exceed an address upper bound should the broker continue accepting +messages until the clients credits are exhausted. For this reason there is an additional parameter available on address +settings that specifies an upper bound on an address size in bytes. Once this upper bound is reach Artemis will start +rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default set to -1 (or no limit). +This is additional parameter allows a kind of soft and hard limit, in normal circumstances the broker will utilize the +max-size-bytes parameter using using flow control to put back pressure on the client, but will protect the broker by +rejecting messages once the address size is reached. ### Rate limited flow control http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 28e38f2..82b6aec 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -412,7 +412,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> { return txContext.getTransactionId(); } - AmqpTransactionContext getTransactionContext() { + public AmqpTransactionContext getTransactionContext() { return txContext; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java index dcf23d2..2f3e22a 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java @@ -213,7 +213,7 @@ public class AmqpTransactionContext { //----- Internal access to context properties ----------------------------// - AmqpTransactionCoordinator getCoordinator() { + public AmqpTransactionCoordinator getCoordinator() { return coordinator; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index b170f82..785543d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -95,8 +95,13 @@ public class ProtonTest extends ActiveMQTestBase { private static final String password = "guest"; + private static final String brokerName = "my-broker"; + private static final long maxSizeBytes = 1 * 1024 * 1024; + + private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024; + // this will ensure that all tests in this class are run twice, // once with "true" passed to the class' constructor and once with "false" @Parameterized.Parameters(name = "{0}") @@ -310,6 +315,7 @@ public class ProtonTest extends ActiveMQTestBase { Assert.assertEquals(q.getMessageCount(), 0); } + @Test public void testRollbackConsumer() throws Throwable { @@ -342,8 +348,11 @@ public class ProtonTest extends ActiveMQTestBase { public void testResourceLimitExceptionOnAddressFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol setAddressFullBlockPolicy(); + String destinationAddress = address + 1; + fillAddress(destinationAddress); - fillAddress(address + 1); + long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); + assertTrue(addressSize >= maxSizeBytesRejectThreshold); } @Test @@ -367,6 +376,9 @@ public class ProtonTest extends ActiveMQTestBase { } assertTrue(e instanceof ResourceAllocationException); assertTrue(e.getMessage().contains("resource-limit-exceeded")); + + long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); + assertTrue(addressSize >= maxSizeBytesRejectThreshold); } @Test @@ -393,6 +405,9 @@ public class ProtonTest extends ActiveMQTestBase { // This should be -1. A single message is buffered in the client, and 0 credit has been allocated. assertTrue(sender.getSender().getCredit() == -1); + + long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); + assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold); } finally { amqpConnection.close(); @@ -446,7 +461,7 @@ public class ProtonTest extends ActiveMQTestBase { fillAddress(address + 1); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = amqpConnection = client.connect(); + AmqpConnection amqpConnection = client.connect(); try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address + 1); @@ -459,6 +474,43 @@ public class ProtonTest extends ActiveMQTestBase { } } + @Test + public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); + + // Create the link attach before filling the address to ensure the link is allocated credit. + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = client.connect(); + + AmqpSession session = amqpConnection.createSession(); + AmqpSender sender = session.createSender(address); + sender.setPresettle(true); + + fillAddress(address); + + final AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[50 * 1024]; + message.setBytes(payload); + + Exception expectedException = null; + try { + session.begin(); + sender.send(message); + session.commit(); + } + catch (Exception e) { + expectedException = e; + } + finally { + amqpConnection.close(); + } + + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains("resource-limit-exceeded")); + assertTrue(expectedException.getMessage().contains("Address is full: " + address)); + } + /** * Fills an address. Careful when using this method. Only use when rejected messages are switched on. * @param address @@ -520,6 +572,7 @@ public class ProtonTest extends ActiveMQTestBase { timeout.await(5, TimeUnit.SECONDS); + System.out.println("Messages Sent: " + sentMessages); if (errors[0] != null) { throw errors[0]; } @@ -1313,7 +1366,8 @@ public class ProtonTest extends ActiveMQTestBase { // For BLOCK tests AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); - addressSettings.setMaxSizeBytes(1 * 1024 * 1024); + addressSettings.setMaxSizeBytes(maxSizeBytes); + addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold); server.getAddressSettingsRepository().addMatch("#", addressSettings); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 33ee0c7..6c42413 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -307,6 +307,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } @Override + public boolean isRejectingMessages() { + return false; + } + + @Override public void applySetting(AddressSettings addressSettings) { }
