This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f4982913f ARTEMIS-4185 Resending compressed message uncompressed 
throws exception in consumer
     new 1e356eb27c This closes #4383
0f4982913f is described below

commit 0f4982913fff32516a1eb50aae71b05e19debe99
Author: a181321 <[email protected]>
AuthorDate: Mon Feb 27 21:06:24 2023 +0100

    ARTEMIS-4185 Resending compressed message uncompressed throws exception in 
consumer
---
 .../core/client/impl/ClientProducerImpl.java       |  4 ++
 .../client/LargeMessageCompressTest.java           | 54 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index f3b01a7eca..c0a238bf2a 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -443,6 +443,10 @@ public class ClientProducerImpl implements 
ClientProducerInternal {
          deflaterReader = new DeflaterReader(inputStreamParameter, 
messageSize);
          deflaterReader.setLevel(session.getCompressionLevel());
          input = deflaterReader;
+      } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
+         //This needs to be false if we do not intend to compress the message
+         //and the header already exists
+         msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
       }
 
       long totalSize = 0;
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 2d48f8639a..a3271e739d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -31,6 +31,7 @@ import java.util.zip.Deflater;
 import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -513,6 +514,59 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
       validateNoFilesOnLargeDir();
    }
 
+   @Test
+   public void testPreviouslyCompressedMessageCleanup() throws Exception {
+      final int messageSize = 1024 * 1024;
+
+      ActiveMQServer server = createServer(true, isNetty());
+      server.start();
+
+      ClientSessionFactory sf1 = createSessionFactory(locator);
+      ClientSession session1 = addClientSession(sf1.createSession(false, true, 
true));
+      session1.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+      ClientProducer producer = session1.createProducer(ADDRESS);
+
+      ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
+      locator2.setCompressLargeMessage(false);
+      ClientSessionFactory sf2 = locator2.createSessionFactory();
+      ClientSession session2 = sf2.createSession(false, true, true);
+      ClientConsumer consumer = session2.createConsumer(ADDRESS);
+      ClientProducer producer2 = session2.createProducer(ADDRESS);
+      session2.start();
+
+      byte[] payload = new byte[messageSize];
+      byte[] response = new byte[messageSize];
+
+      for (int i = 0; i < payload.length; i++) {
+         payload[i] = RandomUtil.randomByte();
+      }
+
+      ClientMessage message = session1.createMessage(true);
+      message.getBodyBuffer().writeBytes(payload);
+      producer.send(message);
+
+      message = consumer.receive();
+      assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+      message.getBodyBuffer().readBytes(response);
+      message.getBodyBuffer().writeBytes(response);
+      producer2.send(message);
+
+      message = consumer.receive();
+      assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+      message.getBodyBuffer().readBytes(payload);
+      message.getBodySize();
+      assertTrue(Arrays.equals(payload, response));
+
+      session1.close();
+      session2.close();
+      sf1.close();
+      locator.close();
+      sf2.close();
+      locator2.close();
+   }
+
    @Test
    public void testLargeMessageCompressionLevel() throws Exception {
 

Reply via email to