This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 60c5b9871e14d75e8a535640d4824c365a8dfefa
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 10 07:51:30 2022 -0500

    ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if 
compressed
    
    (cherry picked from commit f2e0f8713fb984c0c9b222a1e44470cff4606cf6)
---
 .../core/client/impl/ClientProducerImpl.java       |  3 +-
 .../tests/integration/client/LargeMessageTest.java | 46 ++++++++++++++++------
 2 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index bdb3a02223..25e5f7a372 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -459,7 +459,8 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
                }
 
                pos += numberOfBytesRead;
-            } while (pos < minLargeMessageSize);
+            }
+            while (pos < minLargeMessageSize);
 
             totalSize += pos;
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 2e00436088..b1e96713c5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2795,27 +2795,36 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
    }
 
    @Test
-   public void testStream() throws Exception {
+   public void testStreamedMessage() throws Exception {
+      testStream(false);
+   }
+
+   @Test
+   public void testStreamedMessageCompressed() throws Exception {
+      testStream(true);
+   }
+
+   private void testStream(boolean compressed) throws Exception {
       ActiveMQServer server = createServer(true, isNetty(), storeType);
 
       server.start();
 
-      locator.setCompressLargeMessage(true);
+      locator.setCompressLargeMessage(compressed);
 
       ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
 
       ClientSession session = sf.createSession(false, false);
 
-      final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
-
-      server.createQueue(new 
QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
+      session.createQueue(new QueueConfiguration(ADDRESS));
 
-      ClientProducer producer = session.createProducer(MY_QUEUE);
+      ClientProducer producer = session.createProducer(ADDRESS);
 
       AtomicBoolean closed = new AtomicBoolean(false);
 
+      final int BYTES = 1_000;
+
       InputStream inputStream = new InputStream() {
-         int bytes = 10_000;
+         int bytes = BYTES;
          @Override
          public int read() throws IOException {
             if (bytes-- > 0) {
@@ -2830,12 +2839,10 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
             closed.set(true);
          }
 
-
          @Override
-         public int available () throws IOException {
+         public int available() {
             return bytes;
          }
-
       };
 
       ClientMessage message = session.createMessage(true);
@@ -2844,11 +2851,26 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
 
       Wait.assertTrue(closed::get);
 
-      session.close();
+      session.commit();
 
-   }
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
 
+      ClientMessage receivedMessage = consumer.receive(5000);
+      Assert.assertNotNull(receivedMessage);
 
+      ActiveMQBuffer buffer = receivedMessage.getBodyBuffer();
+      Assert.assertEquals(BYTES, buffer.readableBytes());
 
+      for (int i = 0; i < BYTES; i++) {
+         Assert.assertEquals((byte)10, buffer.readByte());
+      }
+
+      Assert.assertEquals(0, buffer.readableBytes());
+
+      session.close();
+
+   }
 
 }
\ No newline at end of file

Reply via email to