This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new f56595b89b ARTEMIS-4185 - Revision on sending already compressed
messages
f56595b89b is described below
commit f56595b89b5b32ecd928a2c0d2c76410879daaed
Author: a181321 <[email protected]>
AuthorDate: Wed Nov 22 10:19:14 2023 +0100
ARTEMIS-4185 - Revision on sending already compressed messages
---
.../core/client/impl/ClientConsumerImpl.java | 2 +
.../core/client/impl/ClientProducerImpl.java | 4 --
.../client/LargeMessageCompressTest.java | 74 ++++++++++++----------
3 files changed, 43 insertions(+), 37 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 75b5621843..b8a7b86546 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -646,6 +646,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
qbuff.readBytes(body);
largeMessage.setLargeMessageController(new
CompressedLargeMessageControllerImpl(currentLargeMessageController));
currentLargeMessageController.addPacket(body, body.length, false);
+ largeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
handleRegularMessage(largeMessage);
}
@@ -674,6 +675,7 @@ public final class ClientConsumerImpl implements
ClientConsumerInternal {
if (clientLargeMessage.isCompressed()) {
clientLargeMessage.setLargeMessageController(new
CompressedLargeMessageControllerImpl(currentLargeMessageController));
+ clientLargeMessage.putBooleanProperty(Message.HDR_LARGE_COMPRESSED,
false);
} else {
clientLargeMessage.setLargeMessageController(currentLargeMessageController);
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index c0a238bf2a..f3b01a7eca 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -443,10 +443,6 @@ public class ClientProducerImpl implements
ClientProducerInternal {
deflaterReader = new DeflaterReader(inputStreamParameter,
messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
- } else if (msgI.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {
- //This needs to be false if we do not intend to compress the message
- //and the header already exists
- msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, false);
}
long totalSize = 0;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 53661c7a8e..d85ce0a50b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -518,52 +519,59 @@ public class LargeMessageCompressTest extends
LargeMessageTest {
public void testPreviouslyCompressedMessageCleanup() throws Exception {
final int messageSize = 1024 * 1024;
+ byte[] payload = new byte[messageSize];
+ byte[] response = new byte[messageSize];
+
ActiveMQServer server = createServer(true, isNetty());
server.start();
- ClientSessionFactory sf1 = createSessionFactory(locator);
- ClientSession session1 = addClientSession(sf1.createSession(false, true,
true));
- session1.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
- ClientProducer producer = session1.createProducer(ADDRESS);
-
- ServerLocator locator2 = ActiveMQClient.createServerLocator("vm://0");
- locator2.setCompressLargeMessage(false);
- ClientSessionFactory sf2 = locator2.createSessionFactory();
- ClientSession session2 = sf2.createSession(false, true, true);
- ClientConsumer consumer = session2.createConsumer(ADDRESS);
- ClientProducer producer2 = session2.createProducer(ADDRESS);
- session2.start();
-
- byte[] payload = new byte[messageSize];
- byte[] response = new byte[messageSize];
+ server.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
for (int i = 0; i < payload.length; i++) {
payload[i] = RandomUtil.randomByte();
}
- ClientMessage message = session1.createMessage(true);
- message.getBodyBuffer().writeBytes(payload);
- producer.send(message);
+ try (ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(true, true);
+ ClientProducer producer = session.createProducer(ADDRESS)) {
- message = consumer.receive();
- assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeBytes(payload);
+ assertNull(message.getAnnotation(Message.HDR_LARGE_COMPRESSED));
- message.getBodyBuffer().readBytes(response);
- message.getBodyBuffer().writeBytes(response);
- producer2.send(message);
+ producer.send(message);
+ }
- message = consumer.receive();
- assertFalse(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+ ServerLocator locator2 = createFactory(isNetty());
+ locator2.setCompressLargeMessage(false);
+ locator2.setMinLargeMessageSize(1024);
- message.getBodyBuffer().readBytes(payload);
- message.getBodySize();
- assertTrue(Arrays.equals(payload, response));
+ try (ClientSessionFactory sf = locator2.createSessionFactory();
+ ClientSession session = sf.createSession(true, true);
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientProducer producer = session.createProducer(ADDRESS)) {
+
+ ClientMessage message = session.createMessage(true);
+ ICoreMessage serverMessage =
server.locateQueue(ADDRESS).browserIterator().next().getMessage().copy().toCore();
+
+ message.moveHeadersAndProperties(serverMessage);
+
message.getBodyBuffer().writeBytes(serverMessage.getReadOnlyBodyBuffer(),
serverMessage.getBodyBufferSize());
+
+ assertTrue(message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED));
+
+ producer.send(message);
+ session.start();
+
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(2000);
+ assertNotNull(message);
+
+ message.getBodyBuffer().readBytes(response);
+ assertTrue(Arrays.equals(payload, response));
+ message.acknowledge();
+ }
+ }
- session1.close();
- session2.close();
- sf1.close();
- locator.close();
- sf2.close();
locator2.close();
}