This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 87eebc3d27d625c9109ef63b750962a5a9300296 Author: Clebert Suconic <[email protected]> AuthorDate: Wed Apr 15 10:32:39 2020 -0400 ARTEMIS-2712 Dealing with Aborts AMQP Large Message --- .../protocol/amqp/broker/AMQPSessionCallback.java | 5 +++++ .../amqp/proton/ProtonServerReceiverContext.java | 22 ++++++++++++++++++++++ .../proton/ProtonServerReceiverContextTest.java | 13 ++++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f9d689e..33cf22e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -153,6 +154,10 @@ public class AMQPSessionCallback implements SessionCallback { } + public void addCloseable(Closeable closeable) { + serverSession.addCloseable(closeable); + } + public void withinContext(Runnable run) throws Exception { OperationContext context = recoverContext(); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 38aaede..f2d12f5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; @@ -140,6 +141,24 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan(); useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors(); this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize(); + + if (sessionSPI != null) { + sessionSPI.addCloseable((boolean failed) -> clearLargeMessage()); + } + } + + protected void clearLargeMessage() { + connection.runNow(() -> { + if (currentLargeMessage != null) { + try { + currentLargeMessage.deleteFile(); + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentLargeMessage = null; + } + } + }); } @Override @@ -288,6 +307,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements try { if (delivery.isAborted()) { + clearLargeMessage(); + // Aborting implicitly remotely settles, so advance // receiver to the next delivery and settle locally. receiver.advance(); @@ -439,6 +460,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); close(false); + clearLargeMessage(); } public void flow() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index ad6bddd..1071004 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -50,6 +51,7 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.junit.Assert; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -96,7 +98,14 @@ public class ProtonServerReceiverContextTest { when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); - ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); + AtomicInteger clearLargeMessage = new AtomicInteger(0); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver) { + @Override + protected void clearLargeMessage() { + super.clearLargeMessage(); + clearLargeMessage.incrementAndGet(); + } + }; Delivery mockDelivery = mock(Delivery.class); when(mockDelivery.isAborted()).thenReturn(true); @@ -120,6 +129,8 @@ public class ProtonServerReceiverContextTest { verify(mockReceiver, times(1)).flow(1); } verifyNoMoreInteractions(mockReceiver); + + Assert.assertTrue(clearLargeMessage.get() > 0); } private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,
