tabish121 commented on code in PR #4840:
URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1511892522


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java:
##########
@@ -89,6 +89,7 @@ public MessageWriter 
selectOutgoingMessageWriter(ProtonServerSenderContext sende
          if (message.isLargeMessage()) {
             selected = largeMessageWriter != null ? largeMessageWriter :
                (largeMessageWriter = new AMQPLargeMessageWriter(sender));
+            largeMessageWriter.openContext(reference);

Review Comment:
   Ditto, extending the open likely makes more sense if eventually all these 
large message handlers are going to need this on having different opens called 
in different places is confusing.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(currentMessage::validateFile);

Review Comment:
   This is now happening in a different thread, how is the error propagated to 
the remote peer now that it is gone, and the method is swallowing all errors?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(currentMessage::validateFile);
       }
 
-      currentMessage.addBytes(dataBuffer);
+      logger.trace("Disable autoread on {}", this);

Review Comment:
   There is no toString in this type, logging this just logs the object 
reference value, not super helpful for logging.  Also if you need to log this 
doesn't it make more sense to have the disable and enable methods log it, along 
with connection id data.  



##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -238,18 +247,18 @@ public void 
testReadMessageInByteChunksFromDeliveryBuffer() throws Exception {
       when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate());
 
       try {
-         assertNull(reader.readBytes(delivery));
+         readMessage = null;
+         reader.readBytes(delivery);
+         Assert.assertNull(readMessage);
       } catch (IllegalStateException e) {
          fail("Should not throw as the reader should be able to read just 
delivery annotations.");
       }
 
-      assertNotNull(reader.getDeliveryAnnotations());
+      DeliveryAnnotations currentDeliveryAnnotations = 
reader.getDeliveryAnnotations();

Review Comment:
   Change appears unnecessary



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(currentMessage::validateFile);
       }
 
-      currentMessage.addBytes(dataBuffer);
+      logger.trace("Disable autoread on {}", this);
+      serverReceiver.getConnection().disableAutoRead();
 
-      final AMQPLargeMessage result;
+      byte[] bytes = new byte[dataBuffer.remaining()];
+      dataBuffer.get(bytes);
+      logger.trace("Getting {}", bytes.length);
 
-      if (!delivery.isPartial()) {
-         
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
 true);
-         result = currentMessage;
-         // We don't want a close to delete the file now, we've released the 
resources.
-         currentMessage = null;
-         deliveryAnnotations = result.getDeliveryAnnotations();
-      } else {
-         result = null;
-      }
+      boolean partial = delivery.isPartial();
+
+      sessionSPI.execute(() -> addBytes(delivery, bytes, partial));
+
+      return partial;
+   }
 
-      return result;
+   private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
+      ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
+      try {
+         logger.trace("Adding {} bytes on currentMessage={}, this={}", 
dataBuffer.remaining(), currentMessage, this);
+         currentMessage.addBytes(dataBuffer);
+
+         if (!isPartial) {
+            final AMQPLargeMessage message = currentMessage;
+            
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), 
true);
+            logger.trace("finishing {} on {}", currentMessage, this);
+            // We don't want a close to delete the file now, we've released 
the resources.
+            currentMessage = null;
+            close();
+            serverReceiver.connection.runNow(() -> {
+               DeliveryAnnotations deliveryAnnotations = 
message.getDeliveryAnnotations();
+               serverReceiver.onMessageComplete(delivery, message, 
deliveryAnnotations);
+            });
+         }
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         close();
+         serverReceiver.connection.exception(e);
+      } finally {
+         serverReceiver.getConnection().enableAutoRead();
+      }
    }
+

Review Comment:
   Unneeded added newline



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageReader.java:
##########
@@ -155,6 +150,10 @@ public Message readBytes(Delivery delivery) {
       coreMessage.reloadPersistence(buffer, 
sessionSPI.getCoreMessageObjectPools());
       coreMessage.setMessageID(sessionSPI.getStorageManager().generateID());
 
-      return coreMessage;
+      serverReceiver.onMessageComplete(delivery, coreMessage, 
deliveryAnnotations);
+
+      close();
+
+      return delivery.isPartial();

Review Comment:
   You already check isPartial above, can just return false. 



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(currentMessage::validateFile);
       }
 
-      currentMessage.addBytes(dataBuffer);
+      logger.trace("Disable autoread on {}", this);
+      serverReceiver.getConnection().disableAutoRead();
 
-      final AMQPLargeMessage result;
+      byte[] bytes = new byte[dataBuffer.remaining()];
+      dataBuffer.get(bytes);
+      logger.trace("Getting {}", bytes.length);

Review Comment:
   In debugging such value log messages don't help to orient you on what is 
going on



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());

Review Comment:
   Unneeded reformatting.



##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -159,19 +169,18 @@ public void testReadMessageByteByByteFromDeliveryBuffer() 
throws Exception {
          
when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate().position(i - 
1).limit(i).slice());
 
          try {
-            assertNull(reader.readBytes(delivery));
+            readMessage = null;
+            reader.readBytes(delivery);
+            Assert.assertNull(readMessage);
          } catch (IllegalStateException e) {
             fail("Should not throw as the reader should be able to read just 
delivery annotations.");
          }
       }
 
-      assertNotNull(reader.getDeliveryAnnotations());
-
-      final DeliveryAnnotations annotations = reader.getDeliveryAnnotations();
-
-      assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a"));
-      assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b"));
-      assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c"));
+      DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations();

Review Comment:
   Change appears unnecessary 



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -847,6 +847,7 @@ public MessageWriter 
selectOutgoingMessageWriter(ProtonServerSenderContext sende
             if (message.isLargeMessage()) {
                selected = largeMessageWriter != null ? largeMessageWriter :
                   (largeMessageWriter = new AMQPLargeMessageWriter(sender));
+               largeMessageWriter.openContext(reference);

Review Comment:
   It likely makes more sense to extend the open API in the MessageWriter to 
take the message reference instead of adding yet another open method that is 
confusing, and not documented as other methods in the API where.  



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java:
##########
@@ -453,6 +453,7 @@ public MessageWriter 
selectOutgoingMessageWriter(ProtonServerSenderContext sende
 
       if (reference.getMessage() instanceof AMQPLargeMessage) {
          selected = largeMessageWriter;
+         largeMessageWriter.openContext(reference);

Review Comment:
   Replace with updated open API



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java:
##########
@@ -94,6 +94,18 @@ public class AMQPConnectionContext extends 
ProtonInitializable implements EventH
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   public void disableAutoRead() {
+      connectionCallback.getTransportConnection().setAutoRead(false);
+      handler.setReadable(false);
+   }
+
+   public void enableAutoRead() {
+      connectionCallback.getTransportConnection().setAutoRead(true);
+      getHandler().setReadable(true);
+      flush();
+   }
+
+

Review Comment:
   Unneeded extra newline.



##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -116,21 +128,19 @@ public void 
testReadDeliveryAnnotationsFromDeliveryBuffer() throws Exception {
 
       reader.open();
 
-      assertNull(reader.getDeliveryAnnotations());
-
       try {
+         readMessage = null;
          reader.readBytes(delivery);
+         Assert.assertNull(readMessage); // should not been called yet
       } catch (IllegalStateException e) {
          fail("Should not throw as the reader should be able to read just 
delivery annotations.");
       }
 
-      assertNotNull(reader.getDeliveryAnnotations());
-
-      final DeliveryAnnotations annotations = reader.getDeliveryAnnotations();
-
-      assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a"));
-      assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b"));
-      assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c"));
+      // peek current delivery annotations
+      DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations();

Review Comment:
   Change appears unnecessary 



##########
tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java:
##########
@@ -640,6 +640,11 @@ public String getText() throws NoSuchElementException {
       throw new NoSuchElementException("Message does not contain a String 
body");
    }
 
+   public byte[] getBytes() {
+      Data body = (Data)getWrappedMessage().getBody();
+      return body.getValue().getArray();

Review Comment:
   The byte array contained within is not guaranteed to be zero indexed or span 
the full array, you need to check and account for that if you are going to 
expose it.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
-         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
-                                               
sessionSPI.getCoreMessageObjectPools(),
-                                               sessionSPI.getStorageManager());
+         currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(currentMessage::validateFile);
       }
 
-      currentMessage.addBytes(dataBuffer);
+      logger.trace("Disable autoread on {}", this);
+      serverReceiver.getConnection().disableAutoRead();
 
-      final AMQPLargeMessage result;
+      byte[] bytes = new byte[dataBuffer.remaining()];
+      dataBuffer.get(bytes);
+      logger.trace("Getting {}", bytes.length);
 
-      if (!delivery.isPartial()) {
-         
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
 true);
-         result = currentMessage;
-         // We don't want a close to delete the file now, we've released the 
resources.
-         currentMessage = null;
-         deliveryAnnotations = result.getDeliveryAnnotations();
-      } else {
-         result = null;
-      }
+      boolean partial = delivery.isPartial();
+
+      sessionSPI.execute(() -> addBytes(delivery, bytes, partial));
+
+      return partial;
+   }
 
-      return result;
+   private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
+      ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
+      try {
+         logger.trace("Adding {} bytes on currentMessage={}, this={}", 
dataBuffer.remaining(), currentMessage, this);
+         currentMessage.addBytes(dataBuffer);
+
+         if (!isPartial) {
+            final AMQPLargeMessage message = currentMessage;
+            
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), 
true);
+            logger.trace("finishing {} on {}", currentMessage, this);
+            // We don't want a close to delete the file now, we've released 
the resources.
+            currentMessage = null;
+            close();
+            serverReceiver.connection.runNow(() -> {
+               DeliveryAnnotations deliveryAnnotations = 
message.getDeliveryAnnotations();

Review Comment:
   Can skip temp var creation and just call message.getDeliveryAnnotations 
directly.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -81,11 +84,20 @@ public boolean isWriting() {
    public void close() {
       if (!closed) {
          try {
+            try {
+               if (largeBodyReader != null) {
+                  largeBodyReader.close();
+               }
+               largeBodyReader = null;

Review Comment:
   Should be nulled in a finally block, which would happen in the resetClosed 
method called in the main finally block if you added a null there for the large 
body reader as you should.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {
 
    private MessageReference reference;
    private AMQPLargeMessage message;
+
+   LargeBodyReader largeBodyReader;

Review Comment:
   Can be made private from looking though the code, if not an accessor that 
checks opened state is likely a better idea



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -96,18 +108,36 @@ public AMQPLargeMessageWriter open() {
          throw new IllegalStateException("Trying to open an AMQP Large Message 
writer that was not closed");
       }
 
-      reset(false);
+      resetOpen();
 
       return this;
    }
 
-   private void reset(boolean closedState) {
+   public void openContext(MessageReference reference) {
+      this.reference = reference;
+      this.message = (AMQPLargeMessage) reference.getMessage();
+
+      try {
+         largeBodyReader = message.getLargeBodyReader();
+         largeBodyReader.open();
+      } catch (Exception e) {
+         serverSender.reportDeliveryError(this, reference, e);
+      }
+   }
+
+   private void resetClosed() {

Review Comment:
   added member large body reader is no null as it should be here



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
+      logger.trace("Reading {}", delivery);

Review Comment:
   This trace isn't likely to add much useful context just more log noise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to