This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.27.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 58f201bbe3c0902d509f85b7f5d15f6425f324fb Author: Clebert Suconic <[email protected]> AuthorDate: Thu Nov 10 06:27:10 2022 -0500 ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed (cherry picked from commit 9528e4586914808cc9568533f16c12310e743ae1) --- .../core/client/impl/ClientProducerImpl.java | 110 +++++++++++---------- .../activemq/artemis/utils/DeflaterReader.java | 3 +- .../tests/integration/client/LargeMessageTest.java | 58 +++++++++++ 3 files changed, 116 insertions(+), 55 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index a4e4a7eb99..bdb3a02223 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -434,79 +434,81 @@ public class ClientProducerImpl implements ClientProducerInternal { boolean headerSent = false; - int reconnectID = sessionContext.getReconnectID(); - while (!lastPacket) { - byte[] buff = new byte[minLargeMessageSize]; - - int pos = 0; - - do { - int numberOfBytesRead; - - int wanted = minLargeMessageSize - pos; + try { + int reconnectID = sessionContext.getReconnectID(); + while (!lastPacket) { + byte[] buff = new byte[minLargeMessageSize]; - try { - numberOfBytesRead = input.read(buff, pos, wanted); - } catch (IOException e) { - throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e); - } + int pos = 0; - if (numberOfBytesRead == -1) { - lastPacket = true; + do { + int numberOfBytesRead; - break; - } + int wanted = minLargeMessageSize - pos; - pos += numberOfBytesRead; - } - while (pos < minLargeMessageSize); + try { + numberOfBytesRead = input.read(buff, pos, wanted); + } catch (IOException e) { + throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e); + } - totalSize += pos; + if (numberOfBytesRead == -1) { + lastPacket = true; - if (lastPacket) { - if (!session.isCompressLargeMessages()) { - messageSize.set(totalSize); - } - - // This is replacing the last packet by a smaller packet - byte[] buff2 = new byte[pos]; + break; + } - System.arraycopy(buff, 0, buff2, 0, pos); + pos += numberOfBytesRead; + } while (pos < minLargeMessageSize); - buff = buff2; + totalSize += pos; - // This is the case where the message is being converted as a regular message - if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) { - msgI.getBodyBuffer().resetReaderIndex(); - msgI.getBodyBuffer().resetWriterIndex(); - msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); + if (lastPacket) { + if (!session.isCompressLargeMessages()) { + messageSize.set(totalSize); + } - msgI.getBodyBuffer().writeBytes(buff, 0, pos); - sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler); - return; + // This is replacing the last packet by a smaller packet + byte[] buff2 = new byte[pos]; + + System.arraycopy(buff, 0, buff2, 0, pos); + + buff = buff2; + + // This is the case where the message is being converted as a regular message + if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) { + msgI.getBodyBuffer().resetReaderIndex(); + msgI.getBodyBuffer().resetWriterIndex(); + msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize()); + + msgI.getBodyBuffer().writeBytes(buff, 0, pos); + sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler); + return; + } else { + if (!headerSent) { + headerSent = true; + sendInitialLargeMessageHeader(msgI, credits); + } + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler); + credits.acquireCredits(creditsSent); + } } else { if (!headerSent) { headerSent = true; sendInitialLargeMessageHeader(msgI, credits); } - int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler); + + int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler); credits.acquireCredits(creditsSent); } - } else { - if (!headerSent) { - headerSent = true; - sendInitialLargeMessageHeader(msgI, credits); - } - - int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler); - credits.acquireCredits(creditsSent); + } + } finally { + try { + input.close(); + } catch (IOException e) { + throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e); } } - try { - input.close(); - } catch (IOException e) { - throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e); - } } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java index 2f7844ae39..4443687bde 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java @@ -107,7 +107,8 @@ public class DeflaterReader extends InputStream { return read; } - public void closeStream() throws IOException { + @Override + public void close() throws IOException { super.close(); input.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index 11b1204249..2e00436088 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -24,6 +24,8 @@ import javax.jms.Session; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.nio.ByteBuffer; @@ -31,6 +33,7 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.sun.management.UnixOperatingSystemMXBean; @@ -2791,6 +2794,61 @@ public class LargeMessageTest extends LargeMessageTestBase { Wait.assertTrue(() -> ((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount() - fdBefore < 3); } + @Test + public void testStream() throws Exception { + ActiveMQServer server = createServer(true, isNetty(), storeType); + + server.start(); + + locator.setCompressLargeMessage(true); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = sf.createSession(false, false); + + final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE"); + + server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST)); + + ClientProducer producer = session.createProducer(MY_QUEUE); + + AtomicBoolean closed = new AtomicBoolean(false); + + InputStream inputStream = new InputStream() { + int bytes = 10_000; + @Override + public int read() throws IOException { + if (bytes-- > 0) { + return 10; + } else { + return -1; + } + } + + @Override + public void close() { + closed.set(true); + } + + + @Override + public int available () throws IOException { + return bytes; + } + + }; + + ClientMessage message = session.createMessage(true); + message.setBodyInputStream(inputStream); + producer.send(message); + + Wait.assertTrue(closed::get); + + session.close(); + + } + + } \ No newline at end of file
