This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0f4982913f ARTEMIS-4185 Resending compressed message uncompressed
throws exception in consumer
new 1e356eb27c This closes #4383
0f4982913f is described below
commit 0f4982913fff32516a1eb50aae71b05e19debe99
Author: a181321 <[email protected]>
AuthorDate: Mon Feb 27 21:06:24 2023 +0100
ARTEMIS-4185 Resending compressed message uncompressed throws exception in
consumer
---
.../core/client/impl/ClientProducerImpl.java | 4 ++
.../client/LargeMessageCompressTest.java | 54 ++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index f3b01a7eca..c0a238bf2a 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -443,6 +443,10 @@ public class ClientProducerImpl implements
ClientProducerInternal {
deflaterReader = new DeflaterReader(inputStreamParameter,
messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
+ } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
+ //This needs to be false if we do not intend to compress the message
+ //and the header already exists
+ msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}
long totalSize = 0;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 2d48f8639a..a3271e739d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -31,6 +31,7 @@ import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -513,6 +514,59 @@ public class LargeMessageCompressTest extends
LargeMessageTest {
validateNoFilesOnLargeDir();
}
+ @Test
+ public void testPreviouslyCompressedMessageCleanup() throws Exception {
+ final int messageSize = 1024 * 1024;
+
+ ActiveMQServer server = createServer(true, isNetty());
+ server.start();
+
+ ClientSessionFactory sf1 = createSessionFactory(locator);
+ ClientSession session1 = addClientSession(sf1.createSession(false, true,
true));
+ session1.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+ ClientProducer producer = session1.createProducer(ADDRESS);
+
+ ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
+ locator2.setCompressLargeMessage(false);
+ ClientSessionFactory sf2 = locator2.createSessionFactory();
+ ClientSession session2 = sf2.createSession(false, true, true);
+ ClientConsumer consumer = session2.createConsumer(ADDRESS);
+ ClientProducer producer2 = session2.createProducer(ADDRESS);
+ session2.start();
+
+ byte[] payload = new byte[messageSize];
+ byte[] response = new byte[messageSize];
+
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = RandomUtil.randomByte();
+ }
+
+ ClientMessage message = session1.createMessage(true);
+ message.getBodyBuffer().writeBytes(payload);
+ producer.send(message);
+
+ message = consumer.receive();
+ assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+ message.getBodyBuffer().readBytes(response);
+ message.getBodyBuffer().writeBytes(response);
+ producer2.send(message);
+
+ message = consumer.receive();
+ assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+ message.getBodyBuffer().readBytes(payload);
+ message.getBodySize();
+ assertTrue(Arrays.equals(payload, response));
+
+ session1.close();
+ session2.close();
+ sf1.close();
+ locator.close();
+ sf2.close();
+ locator2.close();
+ }
+
@Test
public void testLargeMessageCompressionLevel() throws Exception {