[
https://issues.apache.org/jira/browse/ARTEMIS-2984?focusedWorklogId=548309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-548309
]
ASF GitHub Bot logged work on ARTEMIS-2984:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Feb/21 10:09
Start Date: 05/Feb/21 10:09
Worklog Time Spent: 10m
Work Description: clebertsuconic commented on a change in pull request
#3334:
URL: https://github.com/apache/activemq-artemis/pull/3334#discussion_r570526133
##########
File path:
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
##########
@@ -410,108 +410,117 @@ private void largeMessageSendBuffered(final boolean
sendBlocking,
/**
* @param sendBlocking
* @param msgI
- * @param inputStreamParameter
+ * @param inputStream
* @param credits
* @throws ActiveMQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
final ICoreMessage msgI,
- final InputStream
inputStreamParameter,
+ final InputStream inputStream,
final ClientProducerCredits credits,
SendAcknowledgementHandler handler)
throws ActiveMQException {
- boolean lastPacket = false;
+ final DeflaterReader deflaterReader = session.isCompressLargeMessages()
? new DeflaterReader(inputStream) : null;
- InputStream input = inputStreamParameter;
+ try (InputStream input = session.isCompressLargeMessages() ?
deflaterReader : inputStream) {
- // We won't know the real size of the message since we are compressing
while reading the streaming.
- // This counter will be passed to the deflater to be updated for every
byte read
- AtomicLong messageSize = new AtomicLong();
-
- DeflaterReader deflaterReader = null;
-
- if (session.isCompressLargeMessages()) {
- msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
- deflaterReader = new DeflaterReader(inputStreamParameter,
messageSize);
- input = deflaterReader;
- }
-
- long totalSize = 0;
-
- boolean headerSent = false;
-
- int reconnectID = sessionContext.getReconnectID();
- while (!lastPacket) {
- byte[] buff = new byte[minLargeMessageSize];
-
- int pos = 0;
-
- do {
- int numberOfBytesRead;
-
- int wanted = minLargeMessageSize - pos;
-
- try {
- numberOfBytesRead = input.read(buff, pos, wanted);
- } catch (IOException e) {
- throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
- }
-
- if (numberOfBytesRead == -1) {
- lastPacket = true;
-
- break;
- }
-
- pos += numberOfBytesRead;
+ if (session.isCompressLargeMessages()) {
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
}
- while (pos < minLargeMessageSize);
- totalSize += pos;
+ long totalBytesRead = 0;
- if (lastPacket) {
- if (!session.isCompressLargeMessages()) {
- messageSize.set(totalSize);
- }
+ boolean headerSent = false;
+
+ int reconnectID = sessionContext.getReconnectID();
+ while (true) {
+ final byte[] scratchBuffer = new byte[minLargeMessageSize];
- // This is replacing the last packet by a smaller packet
- byte[] buff2 = new byte[pos];
+ final int result = readLargeMessageChunk(input, scratchBuffer,
minLargeMessageSize);
- System.arraycopy(buff, 0, buff2, 0, pos);
+ final int bytesRead = bytesRead(result);
- buff = buff2;
+ totalBytesRead += bytesRead;
- // This is the case where the message is being converted as a
regular message
- if (!headerSent && session.isCompressLargeMessages() &&
buff2.length < minLargeMessageSize) {
- msgI.getBodyBuffer().resetReaderIndex();
- msgI.getBodyBuffer().resetWriterIndex();
- msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE,
deflaterReader.getTotalSize());
+ final boolean lastPacket = isLargeMessageLastPacket(result);
- msgI.getBodyBuffer().writeBytes(buff, 0, pos);
- sendRegularMessage(msgI.getAddressSimpleString(), msgI,
sendBlocking, credits, handler);
- return;
- } else {
+ if (lastPacket) {
+ // This is the case where the message is being converted as a
regular message
+ if (!headerSent && session.isCompressLargeMessages() &&
bytesRead < minLargeMessageSize) {
+ assert bytesRead == totalBytesRead;
+ msgI.getBodyBuffer().resetReaderIndex();
+ msgI.getBodyBuffer().resetWriterIndex();
+ msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE,
deflaterReader.getTotalSize());
+ msgI.getBodyBuffer().writeBytes(scratchBuffer, 0, bytesRead);
+ sendRegularMessage(msgI.getAddressSimpleString(), msgI,
sendBlocking, credits, handler);
+ break;
+ }
if (!headerSent) {
- headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI,
messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+ final long messageBodySize = deflaterReader != null ?
deflaterReader.getTotalSize() : totalBytesRead;
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI,
messageBodySize, sendBlocking, true, trimmedBuffer(scratchBuffer, bytesRead),
reconnectID, handler);
credits.acquireCredits(creditsSent);
+ break;
}
- } else {
+ // !lastPacket
if (!headerSent) {
headerSent = true;
sendInitialLargeMessageHeader(msgI, credits);
}
- int creditsSent = sessionContext.sendLargeMessageChunk(msgI,
messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
+ int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 0,
sendBlocking, false, trimmedBuffer(scratchBuffer, bytesRead), reconnectID,
handler);
credits.acquireCredits(creditsSent);
}
- }
-
- try {
- input.close();
} catch (IOException e) {
throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
}
}
+
+ /**
+ * This is trimming {@code buffer} to the expected size or reusing it if
not needed.
+ */
+ private static byte[] trimmedBuffer(byte[] buffer, int expectedSize) {
+ if (buffer.length == expectedSize) {
+ return buffer;
+ }
+ byte[] trimmedBuffer = new byte[expectedSize];
+ System.arraycopy(buffer, 0, trimmedBuffer, 0, expectedSize);
+ return trimmedBuffer;
+ }
+
+ private static boolean isLargeMessageLastPacket(int readResult) {
+ return readResult <= 0;
+ }
+
+ private static int bytesRead(int readResult) {
+ return readResult > 0 ? readResult : -readResult;
+ }
+
+ /**
+ * Use {@link #isLargeMessageLastPacket(int)} and {@link #bytesRead(int)}
to decode the result of this method.
+ */
+ private static int readLargeMessageChunk(InputStream inputStream,
+ byte[] readBuffer,
+ int chunkLimit) throws
ActiveMQLargeMessageException {
+ assert chunkLimit > 0;
+ int bytesRead = 0;
+ do {
+ final int remaining = chunkLimit - bytesRead;
+ final int read;
+ try {
+ read = inputStream.read(readBuffer, bytesRead, remaining);
+ } catch (IOException e) {
+ throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
+ }
+ if (read == -1) {
+ // bytesRead can be 0 if the stream return -1 after 0-length reads
+ return -bytesRead;
+ }
+ bytesRead += read;
+ }
+ while (bytesRead < chunkLimit);
+ assert bytesRead == chunkLimit;
Review comment:
is this assert correct? what if you are reading a large message that Is
not a multiple of chunkLimit?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 548309)
Time Spent: 4h 40m (was: 4.5h)
> Compressed large messages can leak native resources
> ---------------------------------------------------
>
> Key: ARTEMIS-2984
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2984
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> Compressed large messages use native resources in the form of Inflater and
> Deflater and should release them in a timely manner (instead of relying on
> finalization) to save OOM to happen (of direct memory, to be precise).
> This issue has the chance to simplify large message controllers, because much
> of the existing code on controllers (including compressed one) isn't needed
> at runtime, but just for testing purposes and a proper fix can move dead code
> there too, saving leaky behavior to be maintained.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)