This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 20370d2920 ARTEMIS-3751 Do not override large message size
new 6d418a19f4 This closes #4030
20370d2920 is described below
commit 20370d29207db92dd4276a5322dddde62e46d3bc
Author: iliya <[email protected]>
AuthorDate: Sun Apr 17 13:15:47 2022 +0300
ARTEMIS-3751 Do not override large message size
It is possible to receive a compressed message from the client as regular
message. Such a message will already contain correct body size, that takes
compression into account.
---
.../impl/journal/LargeServerMessageImpl.java | 6 +-
.../client/LargeMessageCompressTest.java | 165 +++++++++++++++++++++
2 files changed, 170 insertions(+), 1 deletion(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 5b228f2d5a..bedf6941f3 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -70,7 +70,11 @@ public final class LargeServerMessageImpl extends
CoreMessage implements CoreLar
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources(true, true);
- lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE,
readableBytes);
+
+ if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) {
+ lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE,
readableBytes);
+ }
+
return lsm.toMessage();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index eaad11e29a..a4e1a722cb 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -22,7 +22,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.Deflater;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.Message;
@@ -37,6 +41,7 @@ import
org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Assume;
@@ -45,6 +50,13 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.management.openmbean.CompositeData;
/**
@@ -496,4 +508,157 @@ public class LargeMessageCompressTest extends
LargeMessageTest {
public void testSendServerMessage() throws Exception {
// doesn't make sense as compressed
}
+
+
+ // https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-3751
+ @Test
+ public void testOverrideSize() throws Exception {
+ ActiveMQServer server = createServer(true, true);
+
+ server.start();
+
+ ConnectionFactory cf = new
ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=" + (200 *
1024) + "&compressLargeMessage=true");
+ Connection connection = cf.createConnection();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ connection.start();
+ Destination queue = session.createQueue("testQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // What we want is to get a message witch size is larger than client's
minLargeMessageSize (200 kibs here) while compressed the size must be lower
than that
+ // and at same time it must be greater than server minLargeMessageSize
(which is 100 kibs by default)
+ byte[] data = new byte[1024 * 300];
+ new DeflateGenerator(new Random(42), 2.0, 2.0).generate(data, 2);
+
+ assertCompressionSize(data, 100 * 1024, 200 * 1024);
+
+ BytesMessage outMessage = session.createBytesMessage();
+ outMessage.writeBytes(data);
+
+ producer.send(outMessage);
+
+ javax.jms.Message inMessage = consumer.receive(1000);
+
+ assertEqualsByteArrays(data, inMessage.getBody(byte[].class));
+ }
+
+ private void assertCompressionSize(byte[] data, int min, int max) {
+ byte[] output = new byte[data.length];
+
+ Deflater deflater = new Deflater();
+ deflater.setInput(data);
+ deflater.finish();
+ int compressed = deflater.deflate(output);
+ deflater.end();
+
+ assert compressed > min && compressed < max;
+ }
+
+ /**
+ * Generate compressible data.
+ * <p>
+ * Based on "SDGen: Mimicking Datasets for Content Generation in Storage
+ * Benchmarks" by Raúl Gracia-Tinedo et al.
(https://www.usenix.org/node/188461)
+ * and https://github.com/jibsen/lzdatagen
+ */
+ public static class DeflateGenerator {
+
+ private static final int MIN_LENGTH = 3;
+ private static final int MAX_LENGTH = 258;
+ private static final int NUM_LENGTH = MAX_LENGTH - MIN_LENGTH;
+
+ private static final int LENGTH_PER_CHUNK = 512;
+
+ private final Random rnd;
+
+ private final double dataExp;
+ private final double lengthExp;
+
+ public DeflateGenerator(Random rnd, double dataExp, double lengthExp) {
+ this.rnd = rnd;
+ this.dataExp = dataExp;
+ this.lengthExp = lengthExp;
+ }
+
+ private void nextBytes(byte[] buffer, int size) {
+ for (int i = 0; i < size; i++) {
+ buffer[i] = ((byte) ((double) 256 * Math.pow(rnd.nextDouble(),
dataExp)));
+ }
+ }
+
+ private byte[] nextBytes(int size) {
+ byte[] buffer = new byte[size];
+ nextBytes(buffer, size);
+ return buffer;
+ }
+
+ private void nextLengthFrequencies(int[] frequencies) {
+ Arrays.fill(frequencies, 0);
+
+ for (int i = 0; i < LENGTH_PER_CHUNK; i++) {
+ int length = (int) ((double) frequencies.length *
Math.pow(rnd.nextDouble(), lengthExp));
+
+ frequencies[length]++;
+ }
+ }
+
+ public void generate(byte[] result, double ratio) {
+ ByteBuffer generated = generate(result.length, ratio);
+ generated.get(result);
+ }
+
+ public ByteBuffer generate(int size, double ratio) {
+ ByteBuffer result = ByteBuffer.allocate(size);
+
+ byte[] buffer = new byte[MAX_LENGTH];
+ int[] frequencies = new int[NUM_LENGTH];
+
+ int length = 0;
+ int i = 0;
+ boolean repeat = false;
+
+ while (i < size) {
+ while (frequencies[length] == 0) {
+ if (length == 0) {
+ nextBytes(buffer, MAX_LENGTH);
+ nextLengthFrequencies(frequencies);
+
+ length = NUM_LENGTH;
+ }
+
+ length--;
+ }
+
+ int len = length + MIN_LENGTH;
+ frequencies[length]--;
+
+ if (len > size - i) {
+ len = size - i;
+ }
+
+ if (rnd.nextDouble() < 1.0 / ratio) {
+ result.put(nextBytes(len));
+ repeat = false;
+ } else {
+ if (repeat) {
+ result.put(nextBytes(1));
+ i++;
+ }
+
+ result.put(buffer, 0, len);
+
+ repeat = true;
+ }
+
+ i += len;
+ }
+
+ return result.flip();
+ }
+ }
+
+
}