tabish121 commented on code in PR #4840: URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1511892522
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java: ########## @@ -89,6 +89,7 @@ public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext sende if (message.isLargeMessage()) { selected = largeMessageWriter != null ? largeMessageWriter : (largeMessageWriter = new AMQPLargeMessageWriter(sender)); + largeMessageWriter.openContext(reference); Review Comment: Ditto, extending the open likely makes more sense if eventually all these large message handlers are going to need this on having different opens called in different places is confusing. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(currentMessage::validateFile); Review Comment: This is now happening in a different thread, how is the error propagated to the remote peer now that it is gone, and the method is swallowing all errors? ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(currentMessage::validateFile); } - currentMessage.addBytes(dataBuffer); + logger.trace("Disable autoread on {}", this); Review Comment: There is no toString in this type, logging this just logs the object reference value, not super helpful for logging. Also if you need to log this doesn't it make more sense to have the disable and enable methods log it, along with connection id data. ########## artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java: ########## @@ -238,18 +247,18 @@ public void testReadMessageInByteChunksFromDeliveryBuffer() throws Exception { when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate()); try { - assertNull(reader.readBytes(delivery)); + readMessage = null; + reader.readBytes(delivery); + Assert.assertNull(readMessage); } catch (IllegalStateException e) { fail("Should not throw as the reader should be able to read just delivery annotations."); } - assertNotNull(reader.getDeliveryAnnotations()); + DeliveryAnnotations currentDeliveryAnnotations = reader.getDeliveryAnnotations(); Review Comment: Change appears unnecessary ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(currentMessage::validateFile); } - currentMessage.addBytes(dataBuffer); + logger.trace("Disable autoread on {}", this); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); + logger.trace("Getting {}", bytes.length); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; - } + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; + } - return result; + private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) { + ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes); + try { + logger.trace("Adding {} bytes on currentMessage={}, this={}", dataBuffer.remaining(), currentMessage, this); + currentMessage.addBytes(dataBuffer); + + if (!isPartial) { + final AMQPLargeMessage message = currentMessage; + message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + logger.trace("finishing {} on {}", currentMessage, this); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + close(); + serverReceiver.connection.runNow(() -> { + DeliveryAnnotations deliveryAnnotations = message.getDeliveryAnnotations(); + serverReceiver.onMessageComplete(delivery, message, deliveryAnnotations); + }); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + close(); + serverReceiver.connection.exception(e); + } finally { + serverReceiver.getConnection().enableAutoRead(); + } } + Review Comment: Unneeded added newline ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageReader.java: ########## @@ -155,6 +150,10 @@ public Message readBytes(Delivery delivery) { coreMessage.reloadPersistence(buffer, sessionSPI.getCoreMessageObjectPools()); coreMessage.setMessageID(sessionSPI.getStorageManager().generateID()); - return coreMessage; + serverReceiver.onMessageComplete(delivery, coreMessage, deliveryAnnotations); + + close(); + + return delivery.isPartial(); Review Comment: You already check isPartial above, can just return false. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(currentMessage::validateFile); } - currentMessage.addBytes(dataBuffer); + logger.trace("Disable autoread on {}", this); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); + logger.trace("Getting {}", bytes.length); Review Comment: In debugging such value log messages don't help to orient you on what is going on ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); Review Comment: Unneeded reformatting. ########## artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java: ########## @@ -159,19 +169,18 @@ public void testReadMessageByteByByteFromDeliveryBuffer() throws Exception { when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate().position(i - 1).limit(i).slice()); try { - assertNull(reader.readBytes(delivery)); + readMessage = null; + reader.readBytes(delivery); + Assert.assertNull(readMessage); } catch (IllegalStateException e) { fail("Should not throw as the reader should be able to read just delivery annotations."); } } - assertNotNull(reader.getDeliveryAnnotations()); - - final DeliveryAnnotations annotations = reader.getDeliveryAnnotations(); - - assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a")); - assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b")); - assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c")); + DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations(); Review Comment: Change appears unnecessary ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java: ########## @@ -847,6 +847,7 @@ public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext sende if (message.isLargeMessage()) { selected = largeMessageWriter != null ? largeMessageWriter : (largeMessageWriter = new AMQPLargeMessageWriter(sender)); + largeMessageWriter.openContext(reference); Review Comment: It likely makes more sense to extend the open API in the MessageWriter to take the message reference instead of adding yet another open method that is confusing, and not documented as other methods in the API where. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java: ########## @@ -453,6 +453,7 @@ public MessageWriter selectOutgoingMessageWriter(ProtonServerSenderContext sende if (reference.getMessage() instanceof AMQPLargeMessage) { selected = largeMessageWriter; + largeMessageWriter.openContext(reference); Review Comment: Replace with updated open API ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java: ########## @@ -94,6 +94,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public void disableAutoRead() { + connectionCallback.getTransportConnection().setAutoRead(false); + handler.setReadable(false); + } + + public void enableAutoRead() { + connectionCallback.getTransportConnection().setAutoRead(true); + getHandler().setReadable(true); + flush(); + } + + Review Comment: Unneeded extra newline. ########## artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java: ########## @@ -116,21 +128,19 @@ public void testReadDeliveryAnnotationsFromDeliveryBuffer() throws Exception { reader.open(); - assertNull(reader.getDeliveryAnnotations()); - try { + readMessage = null; reader.readBytes(delivery); + Assert.assertNull(readMessage); // should not been called yet } catch (IllegalStateException e) { fail("Should not throw as the reader should be able to read just delivery annotations."); } - assertNotNull(reader.getDeliveryAnnotations()); - - final DeliveryAnnotations annotations = reader.getDeliveryAnnotations(); - - assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a")); - assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b")); - assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c")); + // peek current delivery annotations + DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations(); Review Comment: Change appears unnecessary ########## tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java: ########## @@ -640,6 +640,11 @@ public String getText() throws NoSuchElementException { throw new NoSuchElementException("Message does not contain a String body"); } + public byte[] getBytes() { + Data body = (Data)getWrappedMessage().getBody(); + return body.getValue().getArray(); Review Comment: The byte array contained within is not guaranteed to be zero indexed or span the full array, you need to check and account for that if you are going to expose it. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); + currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(currentMessage::validateFile); } - currentMessage.addBytes(dataBuffer); + logger.trace("Disable autoread on {}", this); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); + logger.trace("Getting {}", bytes.length); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; - } + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; + } - return result; + private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) { + ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes); + try { + logger.trace("Adding {} bytes on currentMessage={}, this={}", dataBuffer.remaining(), currentMessage, this); + currentMessage.addBytes(dataBuffer); + + if (!isPartial) { + final AMQPLargeMessage message = currentMessage; + message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + logger.trace("finishing {} on {}", currentMessage, this); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + close(); + serverReceiver.connection.runNow(() -> { + DeliveryAnnotations deliveryAnnotations = message.getDeliveryAnnotations(); Review Comment: Can skip temp var creation and just call message.getDeliveryAnnotations directly. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java: ########## @@ -81,11 +84,20 @@ public boolean isWriting() { public void close() { if (!closed) { try { + try { + if (largeBodyReader != null) { + largeBodyReader.close(); + } + largeBodyReader = null; Review Comment: Should be nulled in a finally block, which would happen in the resetClosed method called in the main finally block if you added a null there for the large body reader as you should. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java: ########## @@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter { private MessageReference reference; private AMQPLargeMessage message; + + LargeBodyReader largeBodyReader; Review Comment: Can be made private from looking though the code, if not an accessor that checks opened state is likely a better idea ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java: ########## @@ -96,18 +108,36 @@ public AMQPLargeMessageWriter open() { throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed"); } - reset(false); + resetOpen(); return this; } - private void reset(boolean closedState) { + public void openContext(MessageReference reference) { + this.reference = reference; + this.message = (AMQPLargeMessage) reference.getMessage(); + + try { + largeBodyReader = message.getLargeBodyReader(); + largeBodyReader.open(); + } catch (Exception e) { + serverSender.reportDeliveryError(this, reference, e); + } + } + + private void resetClosed() { Review Comment: added member large body reader is no null as it should be here ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + + logger.trace("Reading {}", delivery); Review Comment: This trace isn't likely to add much useful context just more log noise -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org