This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 87eebc3d27d625c9109ef63b750962a5a9300296
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 15 10:32:39 2020 -0400

    ARTEMIS-2712 Dealing with Aborts AMQP Large Message
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  5 +++++
 .../amqp/proton/ProtonServerReceiverContext.java   | 22 ++++++++++++++++++++++
 .../proton/ProtonServerReceiverContextTest.java    | 13 ++++++++++++-
 3 files changed, 39 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f9d689e..33cf22e 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
+import org.apache.activemq.artemis.Closeable;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -153,6 +154,10 @@ public class AMQPSessionCallback implements 
SessionCallback {
 
    }
 
+   public void addCloseable(Closeable closeable) {
+      serverSession.addCloseable(closeable);
+   }
+
    public void withinContext(Runnable run) throws Exception {
       OperationContext context = recoverContext();
       try {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 38aaede..f2d12f5 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@@ -140,6 +141,24 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
       this.creditRunnable = createCreditRunnable(amqpCredits, 
minCreditRefresh, receiver, connection).setRan();
       useModified = 
this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
       this.minLargeMessageSize = 
connection.getProtocolManager().getAmqpMinLargeMessageSize();
+
+      if (sessionSPI != null) {
+         sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
+      }
+   }
+
+   protected void clearLargeMessage() {
+      connection.runNow(() -> {
+         if (currentLargeMessage != null) {
+            try {
+               currentLargeMessage.deleteFile();
+            } catch (Throwable error) {
+               
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+            } finally {
+               currentLargeMessage = null;
+            }
+         }
+      });
    }
 
    @Override
@@ -288,6 +307,8 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
 
       try {
          if (delivery.isAborted()) {
+            clearLargeMessage();
+
             // Aborting implicitly remotely settles, so advance
             // receiver to the next delivery and settle locally.
             receiver.advance();
@@ -439,6 +460,7 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
       receiver.setCondition(condition);
       close(false);
+      clearLargeMessage();
    }
 
    public void flow() {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index ad6bddd..1071004 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -50,6 +51,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.stubbing.Answer;
 
@@ -96,7 +98,14 @@ public class ProtonServerReceiverContextTest {
 
       
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
 
-      ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, 
mockConnContext, null, mockReceiver);
+      AtomicInteger clearLargeMessage = new AtomicInteger(0);
+      ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, 
mockConnContext, null, mockReceiver) {
+         @Override
+         protected void clearLargeMessage() {
+            super.clearLargeMessage();
+            clearLargeMessage.incrementAndGet();
+         }
+      };
 
       Delivery mockDelivery = mock(Delivery.class);
       when(mockDelivery.isAborted()).thenReturn(true);
@@ -120,6 +129,8 @@ public class ProtonServerReceiverContextTest {
          verify(mockReceiver, times(1)).flow(1);
       }
       verifyNoMoreInteractions(mockReceiver);
+
+      Assert.assertTrue(clearLargeMessage.get() > 0);
    }
 
    private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,

Reply via email to