This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 20370d2920 ARTEMIS-3751 Do not override large message size
     new 6d418a19f4 This closes #4030
20370d2920 is described below

commit 20370d29207db92dd4276a5322dddde62e46d3bc
Author: iliya <[email protected]>
AuthorDate: Sun Apr 17 13:15:47 2022 +0300

    ARTEMIS-3751 Do not override large message size
    
    It is possible to receive a compressed message from the client as regular 
message. Such a message will already contain correct body size, that takes 
compression into account.
---
 .../impl/journal/LargeServerMessageImpl.java       |   6 +-
 .../client/LargeMessageCompressTest.java           | 165 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 5b228f2d5a..bedf6941f3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -70,7 +70,11 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements CoreLar
       final int readableBytes = buffer.readableBytes();
       lsm.addBytes(buffer);
       lsm.releaseResources(true, true);
-      lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
readableBytes);
+
+      if (!coreMessage.containsProperty(Message.HDR_LARGE_BODY_SIZE)) {
+         lsm.toMessage().putLongProperty(Message.HDR_LARGE_BODY_SIZE, 
readableBytes);
+      }
+
       return lsm.toMessage();
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index eaad11e29a..a4e1a722cb 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -22,7 +22,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.Deflater;
 
 import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.Message;
@@ -37,6 +41,7 @@ import 
org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -45,6 +50,13 @@ import org.junit.Test;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.management.openmbean.CompositeData;
 
 /**
@@ -496,4 +508,157 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
    public void testSendServerMessage() throws Exception {
       // doesn't make sense as compressed
    }
+
+
+   // https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-3751
+   @Test
+   public void testOverrideSize() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+
+      server.start();
+
+      ConnectionFactory cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=" + (200 * 
1024) + "&compressLargeMessage=true");
+      Connection connection = cf.createConnection();
+
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+      connection.start();
+      Destination queue = session.createQueue("testQueue");
+
+      MessageProducer producer = session.createProducer(queue);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      // What we want is to get a message witch size is larger than client's 
minLargeMessageSize (200 kibs here) while compressed the size must be lower 
than that
+      // and at same time it must be greater than server minLargeMessageSize 
(which is 100 kibs by default)
+      byte[] data = new byte[1024 * 300];
+      new DeflateGenerator(new Random(42), 2.0, 2.0).generate(data, 2);
+
+      assertCompressionSize(data, 100 * 1024, 200 * 1024);
+
+      BytesMessage outMessage = session.createBytesMessage();
+      outMessage.writeBytes(data);
+
+      producer.send(outMessage);
+
+      javax.jms.Message inMessage = consumer.receive(1000);
+
+      assertEqualsByteArrays(data, inMessage.getBody(byte[].class));
+   }
+
+   private void assertCompressionSize(byte[] data, int min, int max) {
+      byte[] output = new byte[data.length];
+
+      Deflater deflater = new Deflater();
+      deflater.setInput(data);
+      deflater.finish();
+      int compressed = deflater.deflate(output);
+      deflater.end();
+
+      assert compressed > min && compressed < max;
+   }
+
+   /**
+    * Generate compressible data.
+    * <p>
+    * Based on "SDGen: Mimicking Datasets for Content Generation in Storage
+    * Benchmarks" by Raúl Gracia-Tinedo et al. 
(https://www.usenix.org/node/188461)
+    * and https://github.com/jibsen/lzdatagen
+    */
+   public static class DeflateGenerator {
+
+      private static final int MIN_LENGTH = 3;
+      private static final int MAX_LENGTH = 258;
+      private static final int NUM_LENGTH = MAX_LENGTH - MIN_LENGTH;
+
+      private static final int LENGTH_PER_CHUNK = 512;
+
+      private final Random rnd;
+
+      private final double dataExp;
+      private final double lengthExp;
+
+      public DeflateGenerator(Random rnd, double dataExp, double lengthExp) {
+         this.rnd = rnd;
+         this.dataExp = dataExp;
+         this.lengthExp = lengthExp;
+      }
+
+      private void nextBytes(byte[] buffer, int size) {
+         for (int i = 0; i < size; i++) {
+            buffer[i] = ((byte) ((double) 256 * Math.pow(rnd.nextDouble(), 
dataExp)));
+         }
+      }
+
+      private byte[] nextBytes(int size) {
+         byte[] buffer = new byte[size];
+         nextBytes(buffer, size);
+         return buffer;
+      }
+
+      private void nextLengthFrequencies(int[] frequencies) {
+         Arrays.fill(frequencies, 0);
+
+         for (int i = 0; i < LENGTH_PER_CHUNK; i++) {
+            int length = (int) ((double) frequencies.length * 
Math.pow(rnd.nextDouble(), lengthExp));
+
+            frequencies[length]++;
+         }
+      }
+
+      public void generate(byte[] result, double ratio) {
+         ByteBuffer generated = generate(result.length, ratio);
+         generated.get(result);
+      }
+
+      public ByteBuffer generate(int size, double ratio) {
+         ByteBuffer result = ByteBuffer.allocate(size);
+
+         byte[] buffer = new byte[MAX_LENGTH];
+         int[] frequencies = new int[NUM_LENGTH];
+
+         int length = 0;
+         int i = 0;
+         boolean repeat = false;
+
+         while (i < size) {
+            while (frequencies[length] == 0) {
+               if (length == 0) {
+                  nextBytes(buffer, MAX_LENGTH);
+                  nextLengthFrequencies(frequencies);
+
+                  length = NUM_LENGTH;
+               }
+
+               length--;
+            }
+
+            int len = length + MIN_LENGTH;
+            frequencies[length]--;
+
+            if (len > size - i) {
+               len = size - i;
+            }
+
+            if (rnd.nextDouble() < 1.0 / ratio) {
+               result.put(nextBytes(len));
+               repeat = false;
+            } else {
+               if (repeat) {
+                  result.put(nextBytes(1));
+                  i++;
+               }
+
+               result.put(buffer, 0, len);
+
+               repeat = true;
+            }
+
+            i += len;
+         }
+
+         return result.flip();
+      }
+   }
+
+
 }

Reply via email to