This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.27.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 7e00a701fe3ae03c196ab3669eeccb2fcc5cac6c Author: Clebert Suconic <[email protected]> AuthorDate: Thu Nov 17 15:34:44 2022 -0500 ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages (cherry picked from commit 0866a2eb8846284d2b865ef876b146cd80270f59) --- .../spi/core/protocol/EmbedMessageUtil.java | 10 ++- .../AMQPLargeMessageOverCoreBridgeTest.java | 2 + .../ClusteredLargeMessageTest.java | 78 +++++++++++++++++++++- 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java index 5678f1b54c..f3a9c80769 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java @@ -112,7 +112,15 @@ public class EmbedMessageUtil { private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) { ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY)); - return readEncoded(message, storageManager, buffer); + Message largeMessageReturn = readEncoded(message, storageManager, buffer); + + if (message instanceof LargeServerMessage && largeMessageReturn instanceof LargeServerMessage) { + LargeServerMessage returnMessage = (LargeServerMessage) largeMessageReturn; + LargeServerMessage sourceMessage = (LargeServerMessage) message; + returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID()); + } + + return largeMessageReturn; } private static boolean checkSignature(final ActiveMQBuffer buffer) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java index c63bc9900d..0f1f5c4860 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java @@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends AmqpClientTestSupport { } sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10); + server.stop(); + server.start(); receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10); if (useDivert) { // We diverted, so messages were copied, we need to make sure we consume from the original queue diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java index eb6bece11f..f51bd5cd2c 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java @@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends SmokeTestBase { public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1"; public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2"; + Process server0Process; Process server1Process; @Before public void before() throws Exception { cleanupData(SERVER_NAME_0); cleanupData(SERVER_NAME_1); - server1Process = startServer(SERVER_NAME_0, 0, 30000); - startServer(SERVER_NAME_1, 100, 30000); + server0Process = startServer(SERVER_NAME_0, 0, 30000); + server1Process = startServer(SERVER_NAME_1, 100, 30000); } @Test @@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase { connection1.close(); connection2.close(); } + + @Test + public void testKillWhileSendingLargeCORE() throws Exception { + testKillWhileSendingLarge("CORE"); + } + + @Test + public void testKillWhileSendingLargeAMQP() throws Exception { + testKillWhileSendingLarge("AMQP"); + } + + public void testKillWhileSendingLarge(String protocol) throws Exception { + + ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716"); + Connection keepConsumerConnection = server2CF.createConnection(); + Session keepConsumerSession = keepConsumerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE); + // a consumer that we should keep to induce message redistribution + MessageConsumer keepConsumer = keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue")); + + String largeBody; + { + StringBuffer largeBodyBuffer = new StringBuffer(); + while (largeBodyBuffer.length() < 1024 * 1024) { + largeBodyBuffer.append("This is large "); + } + largeBody = largeBodyBuffer.toString(); + } + + int NMESSAGES = 10; + + ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + try (Connection connection1 = server1CF.createConnection()) { + Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED); + Queue queue1 = session1.createQueue("testQueue"); + MessageProducer producer1 = session1.createProducer(queue1); + for (int i = 0; i < NMESSAGES; i++) { + TextMessage message = session1.createTextMessage(largeBody); + message.setStringProperty("i", Integer.toString(i)); + producer1.send(message); + + if (i == 5) { + session1.commit(); + } + } + session1.commit(); + } + + keepConsumerConnection.close(); + server1Process.destroyForcibly(); + server1Process = startServer(SERVER_NAME_1, 100, 0); + + for (int i = 0; i < 100; i++) { + // retrying the connection until the server is up + try (Connection willbegone = server2CF.createConnection()) { + break; + } catch (Exception ignored) { + Thread.sleep(100); + } + } + + try (Connection connection2 = server2CF.createConnection()) { + Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue2 = session2.createQueue("testQueue"); + MessageConsumer consumer2 = session2.createConsumer(queue2); + connection2.start(); + + for (int i = 0; i < NMESSAGES; i++) { + TextMessage message = (TextMessage) consumer2.receive(5000); + Assert.assertNotNull(message); + Assert.assertEquals(largeBody, message.getText()); + } + } + } }
