gemmellr commented on code in PR #4840: URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1513095440
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java: ########## @@ -378,6 +389,16 @@ public AMQPConnectionCallback getConnectionCallback() { return connectionCallback; } + public void exception(Throwable e) { + logger.warn(e.getMessage(), e); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + getHandler().getConnection().setCondition(error); + getHandler().getConnection().close(); + flush(); + } Review Comment: This is being called from the 'session executor' so it isnt thread safe in its use of the connection, which should only be used on the connection thread. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java: ########## @@ -144,21 +159,28 @@ private void resume() { private void tryDelivering() { // This is discounting some bytes due to Transfer payload - final int frameSize = protonSender.getSession().getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0); try { + if (this.delivery == null) { + this.delivery = serverSender.createDelivery(reference, (int) this.message.getMessageFormat()); + if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) { + return; + } Review Comment: This looks like it is creating a delivery, then potentially immediately deciding not to send it. Should that not be the other way around? EDIT: Like it was originally when the code lived in writeBytes. ########## artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java: ########## @@ -126,6 +126,9 @@ public final Channel getNettyChannel() { @Override public final void setAutoRead(boolean autoRead) { channel.config().setAutoRead(autoRead); + if (autoRead) { + channel.read(); + } Review Comment: I believe Netty does this itself when setting autoRead true, if it wasnt before. This here does it _every_ time, rather than just if it changed to true. Since I think its also possible it actually reads there and then (if on the correct thread), probably dont want to do it twice or more. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); } - currentMessage.addBytes(dataBuffer); + serverReceiver.getConnection().disableAutoRead(); Review Comment: If it fails after this point, but before its enabled in the other thread (so earlier comment about safety seem to apply) it could stall entirely. Dont see anything handling that. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageReader.java: ########## @@ -155,6 +149,10 @@ public Message readBytes(Delivery delivery) { coreMessage.reloadPersistence(buffer, sessionSPI.getCoreMessageObjectPools()); coreMessage.setMessageID(sessionSPI.getStorageManager().generateID()); - return coreMessage; + serverReceiver.onMessageComplete(delivery, coreMessage, deliveryAnnotations); + + close(); Review Comment: ..commented the same on another file before I got to this one :) ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); Review Comment: I was about to comment on much the same thing (saw it looking at the code rather than the PR). Its also seems unsafe in its use of the reader if the validation should somehow fail. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java: ########## @@ -51,16 +49,7 @@ public interface MessageReader { * * @param delivery * The delivery that has pending incoming bytes. + * @return true if the delivery was partial and more reads are required to complete the operation Review Comment: Rest of the method javadoc needs updating too, its now totally wrong ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); } - currentMessage.addBytes(dataBuffer); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; Review Comment: This seems weird. The bytes arent added yet, but the result is returned, which could be that its no longer partial, which is then acted upon and will treat the message as processed when it likely hasnt yet been. Also, just because auto-read is disabled, doesnt actually mean there arent more bytes that have already been read already and can still arrive from Netty (TLS can sometimes provoke that I believe). There may also be more payload for a _different message_ in proton already, which could already have been given over and will be processed when this returns. EDIT: seems like the 'handler readable' toggle is trying to tackle that. Albeit its not thread safe per earlier comments. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); } - currentMessage.addBytes(dataBuffer); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; + } + + private void validateFile(AMQPLargeMessage message) { + try { + message.validateFile(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + close(); + serverReceiver.connection.exception(e); } + } - return result; + private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) { + ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes); + try { + logger.trace("Adding {} bytes on currentMessage={}, this={}", dataBuffer.remaining(), currentMessage, this); + currentMessage.addBytes(dataBuffer); + + if (!isPartial) { + final AMQPLargeMessage message = currentMessage; + message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + logger.trace("finishing {} on {}", currentMessage, this); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + close(); Review Comment: Yep, I commented on the same before getting to this comment. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReader.java: ########## @@ -209,13 +208,15 @@ public Message readBytes(Delivery delivery) throws Exception { state = State.DONE; - return result; - } + serverReceiver.onMessageComplete(delivery, result, deliveryAnnotations); - return null; + close(); Review Comment: Doesnt feel like a read should be closing itself. Was meant to be managed by the interested party using it. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java: ########## @@ -94,6 +94,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public void disableAutoRead() { + connectionCallback.getTransportConnection().setAutoRead(false); + handler.setReadable(false); + } + + public void enableAutoRead() { + connectionCallback.getTransportConnection().setAutoRead(true); + getHandler().setReadable(true); Review Comment: seems likely to be non-thread-safe depending on how its used...i.e if its ever governed from 2 threads its clearly not safe as the 2 seperate toggles could overlap in a non-safe manner. ########## artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java: ########## @@ -363,13 +363,17 @@ public String toString() { } @Override - public void validateFile() throws ActiveMQException { + public void validateFile() { this.ensureFileExists(true); } - public void ensureFileExists(boolean toOpen) throws ActiveMQException { + public void ensureFileExists(boolean toOpen) { synchronized (largeBody) { - largeBody.ensureFileExists(toOpen); + try { + largeBody.ensureFileExists(toOpen); + } catch (Exception e) { + logger.debug(e.getMessage(), e); + } Review Comment: Seems odd to swallow this...so ensureFileExists() no longer ensures the file exists? Meaning validateFile() isnt validating much? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org