This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new f031481012 ARTEMIS-4982 Cleanup AMQP large message files for rejected 
sends
f031481012 is described below

commit f03148101213b3b762a49938eb08632b43befc2b
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Aug 12 12:36:47 2024 -0400

    ARTEMIS-4982 Cleanup AMQP large message files for rejected sends
    
    When an incoming AMQP large message send is rejected the broker should 
delete the
    large message file as part of the reject handling.
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  | 20 ++++++
 .../amqp/AmqpFlowControlFailDispositionTests.java  | 53 +++++++++++++-
 .../tests/integration/amqp/AmqpSecurityTest.java   | 84 ++++++++++++++++++++++
 3 files changed, 156 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f367350fc9..cf1c3ac641 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -40,6 +40,7 @@ import 
org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -563,6 +564,16 @@ public class AMQPSessionCallback implements 
SessionCallback {
             // We need to transfer IO execution to a different thread 
otherwise we may deadlock netty loop
             sessionExecutor.execute(() -> inSessionSend(context, transaction, 
message, delivery, receiver, routingContext));
          }
+      } catch (Exception e) {
+         if (message.isLargeMessage()) {
+            try {
+               ((LargeServerMessage) message).deleteFile();
+            } catch (Exception e1) {
+               logger.warn("Error while deleting undelivered large AMQP 
message: {}", e.getMessage());
+            }
+         }
+
+         throw e;
       } finally {
          resetContext(oldcontext);
       }
@@ -632,6 +643,15 @@ public class AMQPSessionCallback implements 
SessionCallback {
          }
       } catch (Exception e) {
          logger.warn(e.getMessage(), e);
+
+         if (message.isLargeMessage()) {
+            try {
+               ((LargeServerMessage) message).deleteFile();
+            } catch (Exception e1) {
+               logger.warn("Error while deleting undelivered large AMQP 
message: {}", e.getMessage());
+            }
+         }
+
          context.deliveryFailed(delivery, receiver, e);
       } finally {
          resetContext(oldContext);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
index b06dd1e4b4..00b3fc2809 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.artemis.tests.integration.amqp;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
-
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -38,6 +37,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
@@ -45,6 +45,8 @@ import java.util.Map;
 @ExtendWith(ParameterizedTestExtension.class)
 public class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {
 
+   private static final int MIN_LARGE_MESSAGE_SIZE = 16 * 1024;
+
    @Parameter(index = 0)
    public boolean useModified;
 
@@ -75,6 +77,7 @@ public class AmqpFlowControlFailDispositionTests extends 
JMSClientTestSupport {
    @Override
    protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
       params.put("amqpUseModifiedForTransientDeliveryErrors", useModified);
+      params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
    }
 
    @TestTemplate
@@ -105,4 +108,52 @@ public class AmqpFlowControlFailDispositionTests extends 
JMSClientTestSupport {
          connection.close();
       }
    }
+
+   @TestTemplate
+   @Timeout(60)
+   public void testFailedLargeMessageSendWhenNoSpaceCleansUpLargeFile() throws 
Exception {
+      AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+      AmqpConnection connection = client.connect();
+
+      int expectedRemainingLargeMessageFiles = 0;
+
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName(), null, null, 
outcomes);
+         AmqpMessage message = createAmqpLargeMessage();
+         boolean rejected = false;
+
+         for (int i = 0; i < 1000; i++) {
+            try {
+               sender.send(message);
+               expectedRemainingLargeMessageFiles++;
+            } catch (IOException e) {
+               rejected = true;
+               assertTrue(e.getMessage().contains(expectedMessage),
+                          String.format("Unexpected message expected %s to 
contain %s", e.getMessage(), expectedMessage));
+               break;
+            }
+         }
+
+         assertTrue(rejected, "Expected messages to be refused by broker");
+      } finally {
+         connection.close();
+      }
+
+      validateNoFilesOnLargeDir(getLargeMessagesDir(), 
expectedRemainingLargeMessageFiles);
+   }
+
+   private AmqpMessage createAmqpLargeMessage() {
+      AmqpMessage message = new AmqpMessage();
+
+      byte[] payload = new byte[MIN_LARGE_MESSAGE_SIZE * 2];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = (byte) 65;
+      }
+
+      message.setMessageAnnotation("x-opt-big-blob", new String(payload, 
StandardCharsets.UTF_8));
+      message.setText("test");
+
+      return message;
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
index 489607712b..99dedb8710 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -45,11 +47,18 @@ public class AmqpSecurityTest extends AmqpClientTestSupport 
{
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   private static final int MIN_LARGE_MESSAGE_SIZE = 16384;
+
    @Override
    protected boolean isSecurityEnabled() {
       return true;
    }
 
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
+   }
+
    @Test
    @Timeout(60)
    public void testSaslAuthWithInvalidCredentials() throws Exception {
@@ -277,4 +286,79 @@ public class AmqpSecurityTest extends 
AmqpClientTestSupport {
          connection.close();
       }
    }
+
+   @Test
+   @Timeout(30)
+   public void 
testAnonymousRelayLargeMessageSendFailsWithNotAuthorizedCleansUpLargeMessageFile()
 throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+
+      AmqpClient client = createAmqpClient(guestPass, guestUser);
+      client.setValidator(new AmqpValidator() {
+
+         @Override
+         public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+            DeliveryState state = delivery.getRemoteState();
+
+            if (!delivery.remotelySettled()) {
+               markAsInvalid("delivery is not remotely settled");
+            }
+
+            if (state instanceof Rejected) {
+               Rejected rejected = (Rejected) state;
+               if (rejected.getError() == null || 
rejected.getError().getCondition() == null) {
+                  markAsInvalid("Delivery should have been Rejected with an 
error condition");
+               } else {
+                  ErrorCondition error = rejected.getError();
+                  if 
(!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+                     markAsInvalid("Should have been tagged with unauthorized 
access error");
+                  }
+               }
+            } else {
+               markAsInvalid("Delivery should have been Rejected");
+            }
+
+            latch.countDown();
+         }
+      });
+
+      final AmqpConnection connection = client.connect();
+
+      try {
+         final AmqpSession session = connection.createSession();
+         final AmqpSender sender = session.createAnonymousSender();
+         final AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+         message.setAddress(getQueueName());
+         message.setMessageId("msg" + 1);
+
+         try {
+            sender.send(message);
+            fail("Should not be able to send, message should be rejected");
+         } catch (Exception ex) {
+            ex.printStackTrace();
+         } finally {
+            sender.close();
+         }
+
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         connection.getStateInspector().assertValid();
+      } finally {
+         connection.close();
+      }
+
+      validateNoFilesOnLargeDir();
+   }
+
+   private AmqpMessage createAmqpLargeMessageWithNoBody() {
+      AmqpMessage message = new AmqpMessage();
+
+      byte[] payload = new byte[MIN_LARGE_MESSAGE_SIZE * 2];
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = (byte) 65;
+      }
+
+      message.setMessageAnnotation("x-opt-big-blob", new String(payload, 
StandardCharsets.UTF_8));
+
+      return message;
+   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to