This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new fdcf640f2b ARTEMIS-5669 AMQP bridge receivers prefer modified dispositions by default fdcf640f2b is described below commit fdcf640f2b42a37ad54982579ef1e6b6b1405833 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Thu Sep 18 14:37:35 2025 -0400 ARTEMIS-5669 AMQP bridge receivers prefer modified dispositions by default AMQP Bridge receiver links configure themselves to a default of sending a modified disposition for address full errors to allow the remote sender to redeliver the message instead of the broker default which is to send rejected dispositions meaning the remote must discard or DLQ the message. Allows for configuration at the bridge policy level to override this and ignore the connector URI as this is an opinionated configuration for bridged resources. Also adds ability to configure the drain on address full and delivery is rejected at the bridge configuration level. --- .../connect/bridge/AMQPBridgeConfiguration.java | 46 +++++ .../amqp/connect/bridge/AMQPBridgeConstants.java | 37 +++- .../bridge/AMQPBridgeFromAddressReceiver.java | 15 ++ .../bridge/AMQPBridgeFromQueueReceiver.java | 15 ++ .../bridge/AMQPBridgeReceiverConfiguration.java | 45 +++++ .../amqp/connect/AMQPBridgeFromAddressTest.java | 189 +++++++++++++++++- .../amqp/connect/AMQPBridgeFromQueueTest.java | 219 +++++++++++++++++++++ .../connect/AMQPFederationQueuePolicyTest.java | 2 - 8 files changed, 560 insertions(+), 8 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java index b646b37346..de2800af31 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java @@ -30,6 +30,9 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.AUTO_DELETE_DURABLE_SUBSCRIPTION; @@ -55,6 +58,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_SEND_SETTLED; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_CONSUMER_FILTERS; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -378,4 +382,46 @@ public class AMQPBridgeConfiguration { return DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS; } } + + /** + * (@return the use modified for transient delivery errors configuration} + */ + public boolean isUseModifiedForTransientDeliveryErrors() { + final Object property = properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS); + if (property instanceof Boolean booleanValue) { + return booleanValue; + } else if (property instanceof String string) { + return Boolean.parseBoolean(string); + } else { + return DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS; + } + } + + /** + * (@return the drain link credit on transient delivery errors configuration} + */ + public boolean isDrainOnTransientDeliveryErrors() { + final Object property = properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS); + if (property instanceof Boolean booleanValue) { + return booleanValue; + } else if (property instanceof String string) { + return Boolean.parseBoolean(string); + } else { + return connection.getProtocolManager().isDrainOnTransientDeliveryErrors(); + } + } + + /** + * {@return the bridge receiver link quiesce timeout configuration} + */ + public int getLinkQuiesceTimeout() { + final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT); + if (property instanceof Number number) { + return number.intValue(); + } else if (property instanceof String string) { + return Integer.parseInt(string); + } else { + return connection.getProtocolManager().getLinkQuiesceTimeout(); + } + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java index 66fcbf84a0..0d060de85c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java @@ -271,7 +271,7 @@ public final class AMQPBridgeConstants { /** * Default value for the auto delete address sender durable subscription binding. */ - public static boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION = false; + public static final boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION = false; /** * Encodes a boolean value that indicates if AMQP bridge senders should configure an auto delete option @@ -284,7 +284,7 @@ public final class AMQPBridgeConstants { /** * Default value for the auto delete address sender message count for durable subscription bindings. */ - public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT = 0; + public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT = 0; /** * Encodes a signed long value that controls the delay before auto deletion if using durable address @@ -295,7 +295,7 @@ public final class AMQPBridgeConstants { /** * Default value for the auto delete address sender message count for durable subscription bindings. */ - public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0; + public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0; /** * Encodes a signed long value that controls the message count value that allows for address auto delete @@ -303,4 +303,35 @@ public final class AMQPBridgeConstants { */ public static final String AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = "auto-delete-durable-subscription-delay"; + /** + * Configuration property for how a bridge receiver should respond to delivery errors indicating that an address is + * full and cannot accept messages at this time. By default we want to send Modified outcomes with the delivery failed + * value set to true such that the remote will deliver the message again after incrementing the delivery count of the + * message. + */ + public static final String USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = "amqpUseModifiedForTransientDeliveryErrors"; + + /** + * Default value for how a bridge receiver should respond to delivery errors indicating that an address is full + * and cannot accept messages at this time. By default we want to send Modified outcomes with the delivery failed + * value set to true such that the remote will deliver the message again after incrementing the delivery count of + * the message. This is an opinionated choice and the value set on the connector URI is not referenced by bridges + * as we want to maintain this behavior unless specifically set on bridge configuration explicitly. + */ + public static final boolean DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = true; + + /** + * Configuration property that defines the time in milliseconds that a receiver will wait before considering a pending + * quiesce timeout to have failed and should close the link. This option can be used to override the value specified on + * the connector URI to allow bridges to operate with a different default. + */ + public static final String RECEIVER_LINK_QUIESCE_TIMEOUT = "amqpLinkQuiesceTimeout"; + + /** + * Configuration property that defines if a bridge receiver should drain the link credit when a transient delivery + * error such as the address being full occurs. This option can be used to override the value specified on the + * connector URI to allow bridges to operate with a different default. + */ + public static final String RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS = "amqpDrainOnTransientDeliveryErrors"; + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java index de12f2a4bb..e879368706 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java @@ -276,6 +276,21 @@ public class AMQPBridgeFromAddressReceiver extends AMQPBridgeReceiver { } } + @Override + protected boolean isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) { + return configuration.isUseModifiedForTransientDeliveryErrors(); + } + + @Override + protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext connection) { + return configuration.isDrainOnTransientDeliveryErrors(); + } + + @Override + protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) { + return configuration.getLinkQuiesceTimeout(); + } + @Override protected Runnable createCreditRunnable(AMQPConnectionContext connection) { // We defer to the configuration instance as opposed to the base class version that reads diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java index 85eab69100..2ecdb732fc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java @@ -235,6 +235,21 @@ public class AMQPBridgeFromQueueReceiver extends AMQPBridgeReceiver { this.localQueue = localQueue; } + @Override + protected boolean isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) { + return configuration.isUseModifiedForTransientDeliveryErrors(); + } + + @Override + protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext connection) { + return configuration.isDrainOnTransientDeliveryErrors(); + } + + @Override + protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) { + return configuration.getLinkQuiesceTimeout(); + } + @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { super.close(remoteLinkClose); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java index b7215f7c45..24e0d9ece5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java @@ -21,7 +21,10 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.QUEUE_RECEIVER_IDLE_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LARGE_MESSAGE_THRESHOLD; @@ -210,4 +213,46 @@ public final class AMQPBridgeReceiverConfiguration extends AMQPBridgeLinkConfigu return configuration.isPreferSharedDurableSubscriptions(); } } + + /** + * (@return the use modified for transient delivery errors configuration} + */ + public boolean isUseModifiedForTransientDeliveryErrors() { + final Object property = properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS); + if (property instanceof Boolean booleanValue) { + return booleanValue; + } else if (property instanceof String string) { + return Boolean.parseBoolean(string); + } else { + return configuration.isUseModifiedForTransientDeliveryErrors(); + } + } + + /** + * (@return the drain link credit on transient delivery errors configuration} + */ + public boolean isDrainOnTransientDeliveryErrors() { + final Object property = properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS); + if (property instanceof Boolean booleanValue) { + return booleanValue; + } else if (property instanceof String string) { + return Boolean.parseBoolean(string); + } else { + return configuration.isDrainOnTransientDeliveryErrors(); + } + } + + /** + * {@return the federation receiver link quiesce timeout configuration} + */ + public int getLinkQuiesceTimeout() { + final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT); + if (property instanceof Number number) { + return number.intValue(); + } else if (property instanceof String string) { + return Integer.parseInt(string); + } else { + return configuration.getLinkQuiesceTimeout(); + } + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java index 5d0bbdbbcc..4bdcf2e2a9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java @@ -32,7 +32,10 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.nullValue; @@ -71,6 +74,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; @@ -925,7 +930,7 @@ class AMQPBridgeFromAddressTest extends AmqpClientTestSupport { server.start(); server.deployDivert(divertConfig); // Current implementation requires the source address exist on the local broker before it - // will attempt to federate it from the remote. + // will attempt to bridge it from the remote. server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST)); // Demand on the forwarding address should create a remote consumer for the forwarded address. @@ -999,7 +1004,7 @@ class AMQPBridgeFromAddressTest extends AmqpClientTestSupport { server.start(); server.deployDivert(divertConfig); // Current implementation requires the source address exist on the local broker before it - // will attempt to federate it from the remote. + // will attempt to bridge it from the remote. server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST)); // Demand on the forwarding address should create a remote consumer for the forwarded address. @@ -1824,7 +1829,7 @@ class AMQPBridgeFromAddressTest extends AmqpClientTestSupport { @Test @Timeout(20) - public void testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndFederateAddress() throws Exception { + public void testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndBridgeAddress() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { peer.expectSASLAnonymousConnect(); peer.expectOpen().respond(); @@ -3879,6 +3884,184 @@ class AMQPBridgeFromAddressTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBridgeAddressPolicyElement receiveFromAddress = new AMQPBridgeAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes(getTestName()); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromAddressPolicy(receiveFromAddress); + element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getTestName()); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(500); + server.getAddressSettingsRepository().addMatch(getTestName(), addressSettings); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "A".repeat(2048); + + peer.expectAttach().ofReceiver() + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond(); + peer.expectFlow().withLinkCredit(1000); + + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST) + .setAddress(getTestName()) + .setAutoCreated(false) + .setFilterString("color='red'")); + + Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectDisposition().withState().accepted(); // This should fill the address + peer.expectFlow().withLinkCredit(998).withDrain(true); // Receiver drains credit before sending the disposition + peer.expectDisposition().withState().modified(true); // Expect modified / failed so remote doesn't drop the message + + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "1").also() + .withBody().withString("First Message: " + payload) + .also() + .withDeliveryId(1) + .now(); + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "2").also() + .withBody().withString("Second Message: ") + .also() + .withDeliveryId(2) + .later(10); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + + @Test + public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() throws Exception { + doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true); + } + + @Test + public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() throws Exception { + doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false); + } + + private void doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean drainOnFull) throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBridgeAddressPolicyElement receiveFromAddress = new AMQPBridgeAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.addToIncludes(getTestName()); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromAddressPolicy(receiveFromAddress); + element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10); + element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, String.valueOf(drainOnFull)); + element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getTestName()); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(1000); + server.getAddressSettingsRepository().addMatch(getTestName(), addressSettings); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "A".repeat(2048); + + peer.expectAttach().ofReceiver() + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString(server.getNodeID().toString()))) + .respond(); + peer.expectFlow().withLinkCredit(1000); + + server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + Wait.assertTrue(() -> server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100); + Wait.assertTrue(() -> server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withState().accepted(); // This should fill the address + + if (drainOnFull) { + peer.expectFlow().withDrain(true).withLinkCredit(998); + peer.expectDisposition().withState().modified(true); + peer.expectDetach().withError(notNullValue()).respond(); + } else { + peer.expectDisposition().withState().modified(true); + } + + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "1").also() + .withBody().withString("First Message: " + payload) + .also() + .withDeliveryId(1) + .now(); + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "2").also() + .withBody().withString("Second Message: ") + .also() + .withDeliveryId(2) + .later(5); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + public static class ApplicationPropertiesTransformer implements Transformer { private final Map<String, String> properties = new HashMap<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java index b2de17225f..9cb572a642 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java @@ -32,11 +32,14 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW; import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT; +import static org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS; import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT; import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -69,6 +72,8 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.transformer.Transformer; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; @@ -3765,6 +3770,220 @@ public class AMQPBridgeFromQueueTest extends AmqpClientTestSupport { } } + @Test + @Timeout(20) + public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBridgeQueuePolicyElement receiveFromQueue = new AMQPBridgeQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes(getTestName(), getTestName()); + receiveFromQueue.addProperty(RECEIVER_QUIESCE_TIMEOUT, 10_000); + receiveFromQueue.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 250); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromQueuePolicy(receiveFromQueue); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getTestName()); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(500); + server.getAddressSettingsRepository().addMatch(getTestName(), addressSettings); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "A".repeat(2048); + + peer.expectAttach().ofReceiver() + .withName(allOf(containsString(getTestName()), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .respond(); + peer.expectFlow().withLinkCredit(1000); + + final ConnectionFactory factory = CFUtil.createConnectionFactory( + "AMQP", "tcp://localhost:" + AMQP_PORT + "?jms.prefetchPolicy.all=0"); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(getTestName()); + final MessageConsumer consumer = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectDisposition().withState().accepted(); // This should fill the address + peer.expectFlow().withLinkCredit(998).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDisposition().withState().modified(true); // Expect modified / failed so remote doesn't drop the message + + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().also() + .withMessageAnnotations().withAnnotation("x-opt-test", "1").also() + .withBody().withString("First Message: " + payload) + .also() + .withDeliveryId(1) + .now(); + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().also() + .withMessageAnnotations().withAnnotation("x-opt-test", "2").also() + .withBody().withString("Second Message: ") + .also() + .withDeliveryId(2) + .later(10); + + // Address remains full so no new credit is issued and a clean detach occurs next. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectDetach().respond(); + + consumer.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + + @Test + public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() throws Exception { + doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true); + } + + @Test + public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() throws Exception { + doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false); + } + + private void doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean drainOnFull) throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBridgeQueuePolicyElement receiveFromQueue = new AMQPBridgeQueuePolicyElement(); + receiveFromQueue.setName("queue-policy"); + receiveFromQueue.addToIncludes(getTestName(), getTestName()); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromQueuePolicy(receiveFromQueue); + element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, String.valueOf(drainOnFull)); + element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350); + element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(getTestName()); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); + addressSettings.setMaxSizeBytes(500); + server.getAddressSettingsRepository().addMatch(getTestName(), addressSettings); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final String payload = "A".repeat(2048); + + peer.expectAttach().ofReceiver() + .withName(allOf(containsString(getTestName()), + containsString("queue-receiver"), + containsString(server.getNodeID().toString()))) + .respondInKind(); + peer.expectFlow().withLinkCredit(1000); + + final ConnectionFactory factory = CFUtil.createConnectionFactory( + "AMQP", "tcp://localhost:" + AMQP_PORT + "?jms.prefetchPolicy.all=0"); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(getTestName()); + final MessageConsumer consumer = session.createConsumer(queue); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withState().accepted(); // This should fill the address + + if (drainOnFull) { + peer.expectFlow().withDrain(true).withLinkCredit(998); + peer.expectDisposition().withState().modified(true); + peer.expectDetach().withError(notNullValue()).respond(); + } else { + peer.expectDisposition().withState().modified(true); + } + + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "1").also() + .withBody().withString("First Message: " + payload) + .also() + .withDeliveryId(1) + .now(); + peer.remoteTransfer().withHeader().withDurability(true).also() + .withApplicationProperties().withProperty("color", "red").also() + .withMessageAnnotations().withAnnotation("x-opt-test", "2").also() + .withBody().withString("Second Message: ") + .also() + .withDeliveryId(2) + .later(5); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + if (!drainOnFull) { + peer.expectFlow().withDrain(true).withLinkCredit(998) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach().respond(); + } + + consumer.close(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + } + } + public static class ApplicationPropertiesTransformer implements Transformer { private final Map<String, String> properties = new HashMap<>(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java index 5b217fd31d..bdaf6cd706 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java @@ -90,7 +90,6 @@ import org.slf4j.LoggerFactory; import static org.apache.activemq.artemis.core.config.WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE; -import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE; import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION; @@ -5458,7 +5457,6 @@ public class AMQPFederationQueuePolicyTest extends AmqpClientTestSupport { final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement(); element.setName(getTestName()); element.addLocalQueuePolicy(receiveFromQueue); - element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0); element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, String.valueOf(drainOnFull)); element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350); element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org For additional commands, e-mail: commits-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact