This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fdcf640f2b ARTEMIS-5669 AMQP bridge receivers prefer modified 
dispositions by default
fdcf640f2b is described below

commit fdcf640f2b42a37ad54982579ef1e6b6b1405833
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Thu Sep 18 14:37:35 2025 -0400

    ARTEMIS-5669 AMQP bridge receivers prefer modified dispositions by default
    
    AMQP Bridge receiver links configure themselves to a default of sending a 
modified
    disposition for address full errors to allow the remote sender to redeliver 
the message
    instead of the broker default which is to send rejected dispositions 
meaning the remote
    must discard or DLQ the message. Allows for configuration at the bridge 
policy level
    to override this and ignore the connector URI as this is an opinionated 
configuration for
    bridged resources. Also adds ability to configure the drain on address full 
and delivery
    is rejected at the bridge configuration level.
---
 .../connect/bridge/AMQPBridgeConfiguration.java    |  46 +++++
 .../amqp/connect/bridge/AMQPBridgeConstants.java   |  37 +++-
 .../bridge/AMQPBridgeFromAddressReceiver.java      |  15 ++
 .../bridge/AMQPBridgeFromQueueReceiver.java        |  15 ++
 .../bridge/AMQPBridgeReceiverConfiguration.java    |  45 +++++
 .../amqp/connect/AMQPBridgeFromAddressTest.java    | 189 +++++++++++++++++-
 .../amqp/connect/AMQPBridgeFromQueueTest.java      | 219 +++++++++++++++++++++
 .../connect/AMQPFederationQueuePolicyTest.java     |   2 -
 8 files changed, 560 insertions(+), 8 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
index b646b37346..de2800af31 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java
@@ -30,6 +30,9 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.AUTO_DELETE_DURABLE_SUBSCRIPTION;
@@ -55,6 +58,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_SEND_SETTLED;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DISABLE_RECEIVER_DEMAND_TRACKING;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -378,4 +382,46 @@ public class AMQPBridgeConfiguration {
          return DEFAULT_PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
       }
    }
+
+   /**
+    * (@return the use modified for transient delivery errors configuration}
+    */
+   public boolean isUseModifiedForTransientDeliveryErrors() {
+      final Object property = 
properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS);
+      if (property instanceof Boolean booleanValue) {
+         return booleanValue;
+      } else if (property instanceof String string) {
+         return Boolean.parseBoolean(string);
+      } else {
+         return DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+      }
+   }
+
+   /**
+    * (@return the drain link credit on transient delivery errors 
configuration}
+    */
+   public boolean isDrainOnTransientDeliveryErrors() {
+      final Object property = 
properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS);
+      if (property instanceof Boolean booleanValue) {
+         return booleanValue;
+      } else if (property instanceof String string) {
+         return Boolean.parseBoolean(string);
+      } else {
+         return 
connection.getProtocolManager().isDrainOnTransientDeliveryErrors();
+      }
+   }
+
+   /**
+    * {@return the bridge receiver link quiesce timeout configuration}
+    */
+   public int getLinkQuiesceTimeout() {
+      final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT);
+      if (property instanceof Number number) {
+         return number.intValue();
+      } else if (property instanceof String string) {
+         return Integer.parseInt(string);
+      } else {
+         return connection.getProtocolManager().getLinkQuiesceTimeout();
+      }
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
index 66fcbf84a0..0d060de85c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConstants.java
@@ -271,7 +271,7 @@ public final class AMQPBridgeConstants {
    /**
     * Default value for the auto delete address sender durable subscription 
binding.
     */
-   public static boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION = false;
+   public static final boolean DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION = 
false;
 
    /**
     * Encodes a boolean value that indicates if AMQP bridge senders should 
configure an auto delete option
@@ -284,7 +284,7 @@ public final class AMQPBridgeConstants {
    /**
     * Default value for the auto delete address sender message count for 
durable subscription bindings.
     */
-   public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT = 0;
+   public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_MSG_COUNT 
= 0;
 
    /**
     * Encodes a signed long value that controls the delay before auto deletion 
if using durable address
@@ -295,7 +295,7 @@ public final class AMQPBridgeConstants {
    /**
     * Default value for the auto delete address sender message count for 
durable subscription bindings.
     */
-   public static long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0;
+   public static final long DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 0;
 
    /**
     * Encodes a signed long value that controls the message count value that 
allows for address auto delete
@@ -303,4 +303,35 @@ public final class AMQPBridgeConstants {
     */
    public static final String AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY = 
"auto-delete-durable-subscription-delay";
 
+   /**
+    * Configuration property for how a bridge receiver should respond to 
delivery errors indicating that an address is
+    * full and cannot accept messages at this time. By default we want to send 
Modified outcomes with the delivery failed
+    * value set to true such that the remote will deliver the message again 
after incrementing the delivery count of the
+    * message.
+    */
+   public static final String USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = 
"amqpUseModifiedForTransientDeliveryErrors";
+
+   /**
+    * Default value for how a bridge receiver should respond to delivery 
errors indicating that an address is full
+    * and cannot accept messages at this time. By default we want to send 
Modified outcomes with the delivery failed
+    * value set to true such that the remote will deliver the message again 
after incrementing the delivery count of
+    * the message. This is an opinionated choice and the value set on the 
connector URI is not referenced by bridges
+    * as we want to maintain this behavior unless specifically set on bridge 
configuration explicitly.
+    */
+   public static final boolean 
DEFAULT_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS = true;
+
+   /**
+    * Configuration property that defines the time in milliseconds that a 
receiver will wait before considering a pending
+    * quiesce timeout to have failed and should close the link. This option 
can be used to override the value specified on
+    * the connector URI to allow bridges to operate with a different default.
+    */
+   public static final String RECEIVER_LINK_QUIESCE_TIMEOUT = 
"amqpLinkQuiesceTimeout";
+
+   /**
+    * Configuration property that defines if a bridge receiver should drain 
the link credit when a transient delivery
+    * error such as the address being full occurs. This option can be used to 
override the value specified on the
+    * connector URI to allow bridges to operate with a different default.
+    */
+   public static final String RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS = 
"amqpDrainOnTransientDeliveryErrors";
+
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
index de12f2a4bb..e879368706 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromAddressReceiver.java
@@ -276,6 +276,21 @@ public class AMQPBridgeFromAddressReceiver extends 
AMQPBridgeReceiver {
          }
       }
 
+      @Override
+      protected boolean 
isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) {
+         return configuration.isUseModifiedForTransientDeliveryErrors();
+      }
+
+      @Override
+      protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext 
connection) {
+         return configuration.isDrainOnTransientDeliveryErrors();
+      }
+
+      @Override
+      protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) {
+         return configuration.getLinkQuiesceTimeout();
+      }
+
       @Override
       protected Runnable createCreditRunnable(AMQPConnectionContext 
connection) {
          // We defer to the configuration instance as opposed to the base 
class version that reads
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
index 85eab69100..2ecdb732fc 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeFromQueueReceiver.java
@@ -235,6 +235,21 @@ public class AMQPBridgeFromQueueReceiver extends 
AMQPBridgeReceiver {
          this.localQueue = localQueue;
       }
 
+      @Override
+      protected boolean 
isUseModifiedForTransientDeliveryErrors(AMQPConnectionContext connection) {
+         return configuration.isUseModifiedForTransientDeliveryErrors();
+      }
+
+      @Override
+      protected boolean isDrainOnTransientDeliveryErrors(AMQPConnectionContext 
connection) {
+         return configuration.isDrainOnTransientDeliveryErrors();
+      }
+
+      @Override
+      protected int getLinkQuiesceTimeout(AMQPConnectionContext connection) {
+         return configuration.getLinkQuiesceTimeout();
+      }
+
       @Override
       public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
          super.close(remoteLinkClose);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
index b7215f7c45..24e0d9ece5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java
@@ -21,7 +21,10 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.IGNORE_QUEUE_FILTERS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.QUEUE_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.LARGE_MESSAGE_THRESHOLD;
@@ -210,4 +213,46 @@ public final class AMQPBridgeReceiverConfiguration extends 
AMQPBridgeLinkConfigu
          return configuration.isPreferSharedDurableSubscriptions();
       }
    }
+
+   /**
+    * (@return the use modified for transient delivery errors configuration}
+    */
+   public boolean isUseModifiedForTransientDeliveryErrors() {
+      final Object property = 
properties.get(USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS);
+      if (property instanceof Boolean booleanValue) {
+         return booleanValue;
+      } else if (property instanceof String string) {
+         return Boolean.parseBoolean(string);
+      } else {
+         return configuration.isUseModifiedForTransientDeliveryErrors();
+      }
+   }
+
+   /**
+    * (@return the drain link credit on transient delivery errors 
configuration}
+    */
+   public boolean isDrainOnTransientDeliveryErrors() {
+      final Object property = 
properties.get(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS);
+      if (property instanceof Boolean booleanValue) {
+         return booleanValue;
+      } else if (property instanceof String string) {
+         return Boolean.parseBoolean(string);
+      } else {
+         return configuration.isDrainOnTransientDeliveryErrors();
+      }
+   }
+
+   /**
+    * {@return the federation receiver link quiesce timeout configuration}
+    */
+   public int getLinkQuiesceTimeout() {
+      final Object property = properties.get(RECEIVER_LINK_QUIESCE_TIMEOUT);
+      if (property instanceof Number number) {
+         return number.intValue();
+      } else if (property instanceof String string) {
+         return Integer.parseInt(string);
+      } else {
+         return configuration.getLinkQuiesceTimeout();
+      }
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
index 5d0bbdbbcc..4bdcf2e2a9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java
@@ -32,7 +32,10 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.MAX_LINK_RECOVERY_ATTEMPTS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PREFER_SHARED_DURABLE_SUBSCRIPTIONS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.PRESETTLE_SEND_MODE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -71,6 +74,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@@ -925,7 +930,7 @@ class AMQPBridgeFromAddressTest extends 
AmqpClientTestSupport {
          server.start();
          server.deployDivert(divertConfig);
          // Current implementation requires the source address exist on the 
local broker before it
-         // will attempt to federate it from the remote.
+         // will attempt to bridge it from the remote.
          server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), 
RoutingType.MULTICAST));
 
          // Demand on the forwarding address should create a remote consumer 
for the forwarded address.
@@ -999,7 +1004,7 @@ class AMQPBridgeFromAddressTest extends 
AmqpClientTestSupport {
          server.start();
          server.deployDivert(divertConfig);
          // Current implementation requires the source address exist on the 
local broker before it
-         // will attempt to federate it from the remote.
+         // will attempt to bridge it from the remote.
          server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), 
RoutingType.MULTICAST));
 
          // Demand on the forwarding address should create a remote consumer 
for the forwarded address.
@@ -1824,7 +1829,7 @@ class AMQPBridgeFromAddressTest extends 
AmqpClientTestSupport {
 
    @Test
    @Timeout(20)
-   public void 
testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndFederateAddress()
 throws Exception {
+   public void 
testAddressPolicyCanOverridesZeroCreditsInBridgeConfigurationAndBridgeAddress() 
throws Exception {
       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect();
          peer.expectOpen().respond();
@@ -3879,6 +3884,184 @@ class AMQPBridgeFromAddressTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeAddressPolicyElement receiveFromAddress = new 
AMQPBridgeAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromAddressPolicy(receiveFromAddress);
+         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getTestName());
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+         addressSettings.setMaxSizeBytes(500);
+         server.getAddressSettingsRepository().addMatch(getTestName(), 
addressSettings);
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final String payload = "A".repeat(2048);
+
+         peer.expectAttach().ofReceiver()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+                                                                
.setAddress(getTestName())
+                                                                
.setAutoCreated(false)
+                                                                
.setFilterString("color='red'"));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectDisposition().withState().accepted();       // This should 
fill the address
+         peer.expectFlow().withLinkCredit(998).withDrain(true); // Receiver 
drains credit before sending the disposition
+         peer.expectDisposition().withState().modified(true);   // Expect 
modified / failed so remote doesn't drop the message
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
+   @Test
+   public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() 
throws Exception {
+      doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true);
+   }
+
+   @Test
+   public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() 
throws Exception {
+      doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false);
+   }
+
+   private void 
doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean 
drainOnFull) throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeAddressPolicyElement receiveFromAddress = new 
AMQPBridgeAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromAddressPolicy(receiveFromAddress);
+         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 10);
+         element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, 
String.valueOf(drainOnFull));
+         element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getTestName());
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+         addressSettings.setMaxSizeBytes(1000);
+         server.getAddressSettingsRepository().addMatch(getTestName(), 
addressSettings);
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final String payload = "A".repeat(2048);
+
+         peer.expectAttach().ofReceiver()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted(); // This should fill 
the address
+
+         if (drainOnFull) {
+            peer.expectFlow().withDrain(true).withLinkCredit(998);
+            peer.expectDisposition().withState().modified(true);
+            peer.expectDetach().withError(notNullValue()).respond();
+         } else {
+            peer.expectDisposition().withState().modified(true);
+         }
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(5);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
    public static class ApplicationPropertiesTransformer implements Transformer 
{
 
       private final Map<String, String> properties = new HashMap<>();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
index b2de17225f..9cb572a642 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromQueueTest.java
@@ -32,11 +32,14 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridg
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_CREDITS_LOW;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_QUIESCE_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_LINK_QUIESCE_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.bridge.AMQPBridgeConstants.RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNEL_CORE_MESSAGES;
 import static org.hamcrest.CoreMatchers.allOf;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -69,6 +72,8 @@ import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@@ -3765,6 +3770,220 @@ public class AMQPBridgeFromQueueTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void testBridgeReceiverRejectsWithModifiedDeliveryFailedAsDefault() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeQueuePolicyElement receiveFromQueue = new 
AMQPBridgeQueuePolicyElement();
+         receiveFromQueue.setName("queue-policy");
+         receiveFromQueue.addToIncludes(getTestName(), getTestName());
+         receiveFromQueue.addProperty(RECEIVER_QUIESCE_TIMEOUT, 10_000);
+         receiveFromQueue.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 250);
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromQueuePolicy(receiveFromQueue);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getTestName());
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+         addressSettings.setMaxSizeBytes(500);
+         server.getAddressSettingsRepository().addMatch(getTestName(), 
addressSettings);
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress(getTestName())
+                                                                
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final String payload = "A".repeat(2048);
+
+         peer.expectAttach().ofReceiver()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("queue-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = CFUtil.createConnectionFactory(
+            "AMQP", "tcp://localhost:" + AMQP_PORT + 
"?jms.prefetchPolicy.all=0");
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final Queue queue = session.createQueue(getTestName());
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            peer.expectDisposition().withState().accepted();     // This 
should fill the address
+            peer.expectFlow().withLinkCredit(998).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDisposition().withState().modified(true); // Expect 
modified / failed so remote doesn't drop the message
+
+            peer.remoteTransfer().withHeader().withDurability(true).also()
+                                 .withApplicationProperties().also()
+                                 
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                                 .withBody().withString("First Message: " + 
payload)
+                                 .also()
+                                 .withDeliveryId(1)
+                                 .now();
+            peer.remoteTransfer().withHeader().withDurability(true).also()
+                                 .withApplicationProperties().also()
+                                 
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                                 .withBody().withString("Second Message: ")
+                                 .also()
+                                 .withDeliveryId(2)
+                                 .later(10);
+
+            // Address remains full so no new credit is issued and a clean 
detach occurs next.
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            peer.expectDetach().respond();
+
+            consumer.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         }
+
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
+   @Test
+   public void testDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() 
throws Exception {
+      doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(true);
+   }
+
+   @Test
+   public void testNoDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel() 
throws Exception {
+      doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(false);
+   }
+
+   private void 
doTestDrainReceiverOnTransientErrorsConfiguredAtBridgeLevel(boolean 
drainOnFull) throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeQueuePolicyElement receiveFromQueue = new 
AMQPBridgeQueuePolicyElement();
+         receiveFromQueue.setName("queue-policy");
+         receiveFromQueue.addToIncludes(getTestName(), getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromQueuePolicy(receiveFromQueue);
+         element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, 
String.valueOf(drainOnFull));
+         element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
+         element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(getTestName());
+         
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+         addressSettings.setMaxSizeBytes(500);
+         server.getAddressSettingsRepository().addMatch(getTestName(), 
addressSettings);
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress(getTestName())
+                                                                
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final String payload = "A".repeat(2048);
+
+         peer.expectAttach().ofReceiver()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("queue-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = CFUtil.createConnectionFactory(
+            "AMQP", "tcp://localhost:" + AMQP_PORT + 
"?jms.prefetchPolicy.all=0");
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final Queue queue = session.createQueue(getTestName());
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDisposition().withState().accepted(); // This should 
fill the address
+
+            if (drainOnFull) {
+               peer.expectFlow().withDrain(true).withLinkCredit(998);
+               peer.expectDisposition().withState().modified(true);
+               peer.expectDetach().withError(notNullValue()).respond();
+            } else {
+               peer.expectDisposition().withState().modified(true);
+            }
+
+            peer.remoteTransfer().withHeader().withDurability(true).also()
+                                 
.withApplicationProperties().withProperty("color", "red").also()
+                                 
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                                 .withBody().withString("First Message: " + 
payload)
+                                 .also()
+                                 .withDeliveryId(1)
+                                 .now();
+            peer.remoteTransfer().withHeader().withDurability(true).also()
+                                 
.withApplicationProperties().withProperty("color", "red").also()
+                                 
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                                 .withBody().withString("Second Message: ")
+                                 .also()
+                                 .withDeliveryId(2)
+                                 .later(5);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            if (!drainOnFull) {
+               peer.expectFlow().withDrain(true).withLinkCredit(998)
+                                .respond()
+                                
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+               peer.expectDetach().respond();
+            }
+
+            consumer.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
    public static class ApplicationPropertiesTransformer implements Transformer 
{
 
       private final Map<String, String> properties = new HashMap<>();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 5b217fd31d..bdaf6cd706 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -90,7 +90,6 @@ import org.slf4j.LoggerFactory;
 
 import static 
org.apache.activemq.artemis.core.config.WildcardConfiguration.DEFAULT_WILDCARD_CONFIGURATION;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_RECEIVER_IDLE_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADD_QUEUE_POLICY;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.EVENT_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONFIGURATION;
@@ -5458,7 +5457,6 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
          final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
          element.setName(getTestName());
          element.addLocalQueuePolicy(receiveFromQueue);
-         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
          element.addProperty(RECEIVER_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS, 
String.valueOf(drainOnFull));
          element.addProperty(RECEIVER_LINK_QUIESCE_TIMEOUT, 350);
          element.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@activemq.apache.org
For additional commands, e-mail: commits-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact



Reply via email to