This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 58f201bbe3c0902d509f85b7f5d15f6425f324fb
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 10 06:27:10 2022 -0500

    ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if 
compressed
    
    (cherry picked from commit 9528e4586914808cc9568533f16c12310e743ae1)
---
 .../core/client/impl/ClientProducerImpl.java       | 110 +++++++++++----------
 .../activemq/artemis/utils/DeflaterReader.java     |   3 +-
 .../tests/integration/client/LargeMessageTest.java |  58 +++++++++++
 3 files changed, 116 insertions(+), 55 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index a4e4a7eb99..bdb3a02223 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -434,79 +434,81 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
 
       boolean headerSent = false;
 
-      int reconnectID = sessionContext.getReconnectID();
-      while (!lastPacket) {
-         byte[] buff = new byte[minLargeMessageSize];
-
-         int pos = 0;
-
-         do {
-            int numberOfBytesRead;
-
-            int wanted = minLargeMessageSize - pos;
+      try {
+         int reconnectID = sessionContext.getReconnectID();
+         while (!lastPacket) {
+            byte[] buff = new byte[minLargeMessageSize];
 
-            try {
-               numberOfBytesRead = input.read(buff, pos, wanted);
-            } catch (IOException e) {
-               throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
-            }
+            int pos = 0;
 
-            if (numberOfBytesRead == -1) {
-               lastPacket = true;
+            do {
+               int numberOfBytesRead;
 
-               break;
-            }
+               int wanted = minLargeMessageSize - pos;
 
-            pos += numberOfBytesRead;
-         }
-         while (pos < minLargeMessageSize);
+               try {
+                  numberOfBytesRead = input.read(buff, pos, wanted);
+               } catch (IOException e) {
+                  throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
+               }
 
-         totalSize += pos;
+               if (numberOfBytesRead == -1) {
+                  lastPacket = true;
 
-         if (lastPacket) {
-            if (!session.isCompressLargeMessages()) {
-               messageSize.set(totalSize);
-            }
-
-            // This is replacing the last packet by a smaller packet
-            byte[] buff2 = new byte[pos];
+                  break;
+               }
 
-            System.arraycopy(buff, 0, buff2, 0, pos);
+               pos += numberOfBytesRead;
+            } while (pos < minLargeMessageSize);
 
-            buff = buff2;
+            totalSize += pos;
 
-            // This is the case where the message is being converted as a 
regular message
-            if (!headerSent && session.isCompressLargeMessages() && 
buff2.length < minLargeMessageSize) {
-               msgI.getBodyBuffer().resetReaderIndex();
-               msgI.getBodyBuffer().resetWriterIndex();
-               msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
deflaterReader.getTotalSize());
+            if (lastPacket) {
+               if (!session.isCompressLargeMessages()) {
+                  messageSize.set(totalSize);
+               }
 
-               msgI.getBodyBuffer().writeBytes(buff, 0, pos);
-               sendRegularMessage(msgI.getAddressSimpleString(), msgI, 
sendBlocking, credits, handler);
-               return;
+               // This is replacing the last packet by a smaller packet
+               byte[] buff2 = new byte[pos];
+
+               System.arraycopy(buff, 0, buff2, 0, pos);
+
+               buff = buff2;
+
+               // This is the case where the message is being converted as a 
regular message
+               if (!headerSent && session.isCompressLargeMessages() && 
buff2.length < minLargeMessageSize) {
+                  msgI.getBodyBuffer().resetReaderIndex();
+                  msgI.getBodyBuffer().resetWriterIndex();
+                  msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
deflaterReader.getTotalSize());
+
+                  msgI.getBodyBuffer().writeBytes(buff, 0, pos);
+                  sendRegularMessage(msgI.getAddressSimpleString(), msgI, 
sendBlocking, credits, handler);
+                  return;
+               } else {
+                  if (!headerSent) {
+                     headerSent = true;
+                     sendInitialLargeMessageHeader(msgI, credits);
+                  }
+                  int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 
messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+                  credits.acquireCredits(creditsSent);
+               }
             } else {
                if (!headerSent) {
                   headerSent = true;
                   sendInitialLargeMessageHeader(msgI, credits);
                }
-               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 
messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+
+               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 
messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
                credits.acquireCredits(creditsSent);
             }
-         } else {
-            if (!headerSent) {
-               headerSent = true;
-               sendInitialLargeMessageHeader(msgI, credits);
-            }
-
-            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, 
messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
-            credits.acquireCredits(creditsSent);
+         }
+      } finally {
+         try {
+            input.close();
+         } catch (IOException e) {
+            throw 
ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
          }
       }
 
-      try {
-         input.close();
-      } catch (IOException e) {
-         throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
-      }
    }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
index 2f7844ae39..4443687bde 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
@@ -107,7 +107,8 @@ public class DeflaterReader extends InputStream {
       return read;
    }
 
-   public void closeStream() throws IOException {
+   @Override
+   public void close() throws IOException {
       super.close();
       input.close();
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 11b1204249..2e00436088 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -24,6 +24,8 @@ import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.nio.ByteBuffer;
@@ -31,6 +33,7 @@ import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.sun.management.UnixOperatingSystemMXBean;
@@ -2791,6 +2794,61 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
       Wait.assertTrue(() -> 
((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount() - fdBefore < 3);
    }
 
+   @Test
+   public void testStream() throws Exception {
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      locator.setCompressLargeMessage(true);
+
+      ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(false, false);
+
+      final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+      server.createQueue(new 
QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
+
+      ClientProducer producer = session.createProducer(MY_QUEUE);
+
+      AtomicBoolean closed = new AtomicBoolean(false);
+
+      InputStream inputStream = new InputStream() {
+         int bytes = 10_000;
+         @Override
+         public int read() throws IOException {
+            if (bytes-- > 0) {
+               return 10;
+            } else {
+               return -1;
+            }
+         }
+
+         @Override
+         public void close() {
+            closed.set(true);
+         }
+
+
+         @Override
+         public int available () throws IOException {
+            return bytes;
+         }
+
+      };
+
+      ClientMessage message = session.createMessage(true);
+      message.setBodyInputStream(inputStream);
+      producer.send(message);
+
+      Wait.assertTrue(closed::get);
+
+      session.close();
+
+   }
+
+
 
 
 }
\ No newline at end of file

Reply via email to