[
https://issues.apache.org/jira/browse/ARTEMIS-4668?focusedWorklogId=908364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908364
]
ASF GitHub Bot logged work on ARTEMIS-4668:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Mar/24 16:24
Start Date: 05/Mar/24 16:24
Worklog Time Spent: 10m
Work Description: tabish121 commented on code in PR #4840:
URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1513124572
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(() -> validateFile(currentMessage));
}
- currentMessage.addBytes(dataBuffer);
+ serverReceiver.getConnection().disableAutoRead();
- final AMQPLargeMessage result;
+ byte[] bytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(bytes);
- if (!delivery.isPartial()) {
-
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
- result = currentMessage;
- // We don't want a close to delete the file now, we've released the
resources.
- currentMessage = null;
- deliveryAnnotations = result.getDeliveryAnnotations();
- } else {
- result = null;
+ boolean partial = delivery.isPartial();
+
+ sessionSPI.execute(() -> addBytes(delivery, bytes, partial));
+
+ return partial;
+ }
+
+ private void validateFile(AMQPLargeMessage message) {
+ try {
+ message.validateFile();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ close();
+ serverReceiver.connection.exception(e);
}
+ }
- return result;
+ private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
+ ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
+ try {
+ logger.trace("Adding {} bytes on currentMessage={}, this={}",
dataBuffer.remaining(), currentMessage, this);
+ currentMessage.addBytes(dataBuffer);
+
+ if (!isPartial) {
+ final AMQPLargeMessage message = currentMessage;
+
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
+ logger.trace("finishing {} on {}", currentMessage, this);
+ // We don't want a close to delete the file now, we've released
the resources.
+ currentMessage = null;
+ close();
+ serverReceiver.connection.runNow(() ->
serverReceiver.onMessageComplete(delivery, message,
message.getDeliveryAnnotations()));
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ close();
+ serverReceiver.connection.exception(e);
Review Comment:
The addBytes is not running on the connection thread so this call will end
up operating on the proton bits outside the connection thread leading to
possible inconsistent state when handling this.
Issue Time Tracking
-------------------
Worklog Id: (was: 908364)
Time Spent: 3.5h (was: 3h 20m)
> Move AMQP Large Message File Handling away from Netty thread
> ------------------------------------------------------------
>
> Key: ARTEMIS-4668
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4668
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Affects Versions: 2.32.0
> Reporter: Clebert Suconic
> Assignee: Clebert Suconic
> Priority: Major
> Fix For: 2.33.0
>
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> Operations like file.open, file.close, and file.sync should be moved away
> from the Netty Thread for AMQP Large Messages
> This task now is about moving the processing for AMQP Messages. we may in a
> near future also improve tunneled large messages. For now we will do for AMQP
> messages only.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)