This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0866a2eb88 ARTEMIS-4096 Bridge transfer is broken with AMQP Large
messages
0866a2eb88 is described below
commit 0866a2eb8846284d2b865ef876b146cd80270f59
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 17 15:34:44 2022 -0500
ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
---
.../spi/core/protocol/EmbedMessageUtil.java | 10 ++-
.../AMQPLargeMessageOverCoreBridgeTest.java | 2 +
.../ClusteredLargeMessageTest.java | 78 +++++++++++++++++++++-
3 files changed, 87 insertions(+), 3 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
index 5678f1b54c..f3a9c80769 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
@@ -112,7 +112,15 @@ public class EmbedMessageUtil {
private static Message extractLargeMessage(ICoreMessage message,
StorageManager storageManager) {
ActiveMQBuffer buffer =
ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
- return readEncoded(message, storageManager, buffer);
+ Message largeMessageReturn = readEncoded(message, storageManager,
buffer);
+
+ if (message instanceof LargeServerMessage && largeMessageReturn
instanceof LargeServerMessage) {
+ LargeServerMessage returnMessage = (LargeServerMessage)
largeMessageReturn;
+ LargeServerMessage sourceMessage = (LargeServerMessage) message;
+ returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
+ }
+
+ return largeMessageReturn;
}
private static boolean checkSignature(final ActiveMQBuffer buffer) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
index c63bc9900d..0f1f5c4860 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
@@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends
AmqpClientTestSupport {
}
sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1),
largeText.toString(), 10);
+ server.stop();
+ server.start();
receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(),
10);
if (useDivert) {
// We diverted, so messages were copied, we need to make sure we
consume from the original queue
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
index eb6bece11f..f51bd5cd2c 100644
---
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
@@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends
SmokeTestBase {
public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
+ Process server0Process;
Process server1Process;
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
cleanupData(SERVER_NAME_1);
- server1Process = startServer(SERVER_NAME_0, 0, 30000);
- startServer(SERVER_NAME_1, 100, 30000);
+ server0Process = startServer(SERVER_NAME_0, 0, 30000);
+ server1Process = startServer(SERVER_NAME_1, 100, 30000);
}
@Test
@@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase
{
connection1.close();
connection2.close();
}
+
+ @Test
+ public void testKillWhileSendingLargeCORE() throws Exception {
+ testKillWhileSendingLarge("CORE");
+ }
+
+ @Test
+ public void testKillWhileSendingLargeAMQP() throws Exception {
+ testKillWhileSendingLarge("AMQP");
+ }
+
+ public void testKillWhileSendingLarge(String protocol) throws Exception {
+
+ ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61716");
+ Connection keepConsumerConnection = server2CF.createConnection();
+ Session keepConsumerSession = keepConsumerConnection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ // a consumer that we should keep to induce message redistribution
+ MessageConsumer keepConsumer =
keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));
+
+ String largeBody;
+ {
+ StringBuffer largeBodyBuffer = new StringBuffer();
+ while (largeBodyBuffer.length() < 1024 * 1024) {
+ largeBodyBuffer.append("This is large ");
+ }
+ largeBody = largeBodyBuffer.toString();
+ }
+
+ int NMESSAGES = 10;
+
+ ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ try (Connection connection1 = server1CF.createConnection()) {
+ Session session1 = connection1.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue1 = session1.createQueue("testQueue");
+ MessageProducer producer1 = session1.createProducer(queue1);
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage message = session1.createTextMessage(largeBody);
+ message.setStringProperty("i", Integer.toString(i));
+ producer1.send(message);
+
+ if (i == 5) {
+ session1.commit();
+ }
+ }
+ session1.commit();
+ }
+
+ keepConsumerConnection.close();
+ server1Process.destroyForcibly();
+ server1Process = startServer(SERVER_NAME_1, 100, 0);
+
+ for (int i = 0; i < 100; i++) {
+ // retrying the connection until the server is up
+ try (Connection willbegone = server2CF.createConnection()) {
+ break;
+ } catch (Exception ignored) {
+ Thread.sleep(100);
+ }
+ }
+
+ try (Connection connection2 = server2CF.createConnection()) {
+ Session session2 = connection2.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue2 = session2.createQueue("testQueue");
+ MessageConsumer consumer2 = session2.createConsumer(queue2);
+ connection2.start();
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer2.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals(largeBody, message.getText());
+ }
+ }
+ }
}