This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 0ab75b9 ARTEMIS-2554 - Queue control browse broken with large messages
new 153a5d2 This closes #2896
0ab75b9 is described below
commit 0ab75b9968fe115aead128076341b46161756f2d
Author: Andy Taylor <[email protected]>
AuthorDate: Mon Nov 18 10:36:05 2019 +0000
ARTEMIS-2554 - Queue control browse broken with large messages
https://issues.apache.org/jira/browse/ARTEMIS-2554
---
.../management/impl/openmbean/OpenTypeSupport.java | 10 ++--
.../client/LargeMessageCompressTest.java | 63 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
index dfd8ce3..027bbdb 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
@@ -302,10 +302,14 @@ public final class OpenTypeSupport {
Map<String, Object> rc = super.getFields(ref);
ICoreMessage m = ref.getMessage().toCore();
if (!m.isLargeMessage()) {
- SimpleString text =
m.getReadOnlyBodyBuffer().readNullableSimpleString();
- rc.put(CompositeDataConstants.TEXT_BODY, text != null ?
text.toString() : "");
+ if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
+ rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
+ } else {
+ SimpleString text =
m.getReadOnlyBodyBuffer().readNullableSimpleString();
+ rc.put(CompositeDataConstants.TEXT_BODY, text != null ?
text.toString() : "");
+ }
} else {
- rc.put(CompositeDataConstants.TEXT_BODY, "");
+ rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
}
return rc;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 2645482..117ca75 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -33,11 +33,14 @@ import
org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
+import javax.management.openmbean.CompositeData;
+
/**
* A LargeMessageCompressTest
* <br>
@@ -62,6 +65,66 @@ public class LargeMessageCompressTest extends
LargeMessageTest {
}
@Test
+ public void testLargeMessageCompressionNotCompressedAndBrowsed() throws
Exception {
+ final int messageSize = (int) (3.5 *
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ActiveMQServer server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, false,
false));
+
+ session.createTemporaryQueue(ADDRESS, ADDRESS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessageStreaming(session,
messageSize, true);
+
+ clientFile.setType(Message.TEXT_TYPE);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.close();
+
+ QueueControlImpl queueControl = (QueueControlImpl)
server.getManagementService().getResource("queue.SimpleAddress");
+
+ CompositeData[] browse = queueControl.browse();
+
+ Assert.assertNotNull(browse);
+
+ Assert.assertEquals(browse.length, 1);
+
+ Assert.assertEquals(browse[0].get("text"), "[compressed]");
+
+ //clean up
+ session = addClientSession(sf.createSession(false, false, false));
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg1 = consumer.receive(1000);
+ Assert.assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++) {
+ byte b = msg1.getBodyBuffer().readByte();
+ assertEquals("position = " + i, getSamplebyte(i), b);
+ }
+
+ msg1.acknowledge();
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+
+ @Test
public void testLargeMessageCompression() throws Exception {
final int messageSize = (int) (3.5 *
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);