This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new f031481012 ARTEMIS-4982 Cleanup AMQP large message files for rejected
sends
f031481012 is described below
commit f03148101213b3b762a49938eb08632b43befc2b
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Aug 12 12:36:47 2024 -0400
ARTEMIS-4982 Cleanup AMQP large message files for rejected sends
When an incoming AMQP large message send is rejected the broker should
delete the
large message file as part of the reject handling.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 20 ++++++
.../amqp/AmqpFlowControlFailDispositionTests.java | 53 +++++++++++++-
.../tests/integration/amqp/AmqpSecurityTest.java | 84 ++++++++++++++++++++++
3 files changed, 156 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f367350fc9..cf1c3ac641 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -40,6 +40,7 @@ import
org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
@@ -563,6 +564,16 @@ public class AMQPSessionCallback implements
SessionCallback {
// We need to transfer IO execution to a different thread
otherwise we may deadlock netty loop
sessionExecutor.execute(() -> inSessionSend(context, transaction,
message, delivery, receiver, routingContext));
}
+ } catch (Exception e) {
+ if (message.isLargeMessage()) {
+ try {
+ ((LargeServerMessage) message).deleteFile();
+ } catch (Exception e1) {
+ logger.warn("Error while deleting undelivered large AMQP
message: {}", e.getMessage());
+ }
+ }
+
+ throw e;
} finally {
resetContext(oldcontext);
}
@@ -632,6 +643,15 @@ public class AMQPSessionCallback implements
SessionCallback {
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
+
+ if (message.isLargeMessage()) {
+ try {
+ ((LargeServerMessage) message).deleteFile();
+ } catch (Exception e1) {
+ logger.warn("Error while deleting undelivered large AMQP
message: {}", e.getMessage());
+ }
+ }
+
context.deliveryFailed(delivery, receiver, e);
} finally {
resetContext(oldContext);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
index b06dd1e4b4..00b3fc2809 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailDispositionTests.java
@@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertTrue;
-
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -38,6 +37,7 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
@@ -45,6 +45,8 @@ import java.util.Map;
@ExtendWith(ParameterizedTestExtension.class)
public class AmqpFlowControlFailDispositionTests extends JMSClientTestSupport {
+ private static final int MIN_LARGE_MESSAGE_SIZE = 16 * 1024;
+
@Parameter(index = 0)
public boolean useModified;
@@ -75,6 +77,7 @@ public class AmqpFlowControlFailDispositionTests extends
JMSClientTestSupport {
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("amqpUseModifiedForTransientDeliveryErrors", useModified);
+ params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
}
@TestTemplate
@@ -105,4 +108,52 @@ public class AmqpFlowControlFailDispositionTests extends
JMSClientTestSupport {
connection.close();
}
}
+
+ @TestTemplate
+ @Timeout(60)
+ public void testFailedLargeMessageSendWhenNoSpaceCleansUpLargeFile() throws
Exception {
+ AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+ AmqpConnection connection = client.connect();
+
+ int expectedRemainingLargeMessageFiles = 0;
+
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getQueueName(), null, null,
outcomes);
+ AmqpMessage message = createAmqpLargeMessage();
+ boolean rejected = false;
+
+ for (int i = 0; i < 1000; i++) {
+ try {
+ sender.send(message);
+ expectedRemainingLargeMessageFiles++;
+ } catch (IOException e) {
+ rejected = true;
+ assertTrue(e.getMessage().contains(expectedMessage),
+ String.format("Unexpected message expected %s to
contain %s", e.getMessage(), expectedMessage));
+ break;
+ }
+ }
+
+ assertTrue(rejected, "Expected messages to be refused by broker");
+ } finally {
+ connection.close();
+ }
+
+ validateNoFilesOnLargeDir(getLargeMessagesDir(),
expectedRemainingLargeMessageFiles);
+ }
+
+ private AmqpMessage createAmqpLargeMessage() {
+ AmqpMessage message = new AmqpMessage();
+
+ byte[] payload = new byte[MIN_LARGE_MESSAGE_SIZE * 2];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) 65;
+ }
+
+ message.setMessageAnnotation("x-opt-big-blob", new String(payload,
StandardCharsets.UTF_8));
+ message.setText("test");
+
+ return message;
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
index 489607712b..99dedb8710 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -20,6 +20,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -45,11 +47,18 @@ public class AmqpSecurityTest extends AmqpClientTestSupport
{
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int MIN_LARGE_MESSAGE_SIZE = 16384;
+
@Override
protected boolean isSecurityEnabled() {
return true;
}
+ @Override
+ protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+ params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
+ }
+
@Test
@Timeout(60)
public void testSaslAuthWithInvalidCredentials() throws Exception {
@@ -277,4 +286,79 @@ public class AmqpSecurityTest extends
AmqpClientTestSupport {
connection.close();
}
}
+
+ @Test
+ @Timeout(30)
+ public void
testAnonymousRelayLargeMessageSendFailsWithNotAuthorizedCleansUpLargeMessageFile()
throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ AmqpClient client = createAmqpClient(guestPass, guestUser);
+ client.setValidator(new AmqpValidator() {
+
+ @Override
+ public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+ DeliveryState state = delivery.getRemoteState();
+
+ if (!delivery.remotelySettled()) {
+ markAsInvalid("delivery is not remotely settled");
+ }
+
+ if (state instanceof Rejected) {
+ Rejected rejected = (Rejected) state;
+ if (rejected.getError() == null ||
rejected.getError().getCondition() == null) {
+ markAsInvalid("Delivery should have been Rejected with an
error condition");
+ } else {
+ ErrorCondition error = rejected.getError();
+ if
(!error.getCondition().equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+ markAsInvalid("Should have been tagged with unauthorized
access error");
+ }
+ }
+ } else {
+ markAsInvalid("Delivery should have been Rejected");
+ }
+
+ latch.countDown();
+ }
+ });
+
+ final AmqpConnection connection = client.connect();
+
+ try {
+ final AmqpSession session = connection.createSession();
+ final AmqpSender sender = session.createAnonymousSender();
+ final AmqpMessage message = createAmqpLargeMessageWithNoBody();
+
+ message.setAddress(getQueueName());
+ message.setMessageId("msg" + 1);
+
+ try {
+ sender.send(message);
+ fail("Should not be able to send, message should be rejected");
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ } finally {
+ sender.close();
+ }
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ connection.getStateInspector().assertValid();
+ } finally {
+ connection.close();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ private AmqpMessage createAmqpLargeMessageWithNoBody() {
+ AmqpMessage message = new AmqpMessage();
+
+ byte[] payload = new byte[MIN_LARGE_MESSAGE_SIZE * 2];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) 65;
+ }
+
+ message.setMessageAnnotation("x-opt-big-blob", new String(payload,
StandardCharsets.UTF_8));
+
+ return message;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact