This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7e00a701fe3ae03c196ab3669eeccb2fcc5cac6c
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Nov 17 15:34:44 2022 -0500

    ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
    
    (cherry picked from commit 0866a2eb8846284d2b865ef876b146cd80270f59)
---
 .../spi/core/protocol/EmbedMessageUtil.java        | 10 ++-
 .../AMQPLargeMessageOverCoreBridgeTest.java        |  2 +
 .../ClusteredLargeMessageTest.java                 | 78 +++++++++++++++++++++-
 3 files changed, 87 insertions(+), 3 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
index 5678f1b54c..f3a9c80769 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
@@ -112,7 +112,15 @@ public class EmbedMessageUtil {
    private static Message extractLargeMessage(ICoreMessage message, 
StorageManager storageManager) {
       ActiveMQBuffer buffer = 
ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
 
-      return readEncoded(message, storageManager, buffer);
+      Message largeMessageReturn = readEncoded(message, storageManager, 
buffer);
+
+      if (message instanceof LargeServerMessage && largeMessageReturn 
instanceof LargeServerMessage) {
+         LargeServerMessage returnMessage = (LargeServerMessage) 
largeMessageReturn;
+         LargeServerMessage sourceMessage = (LargeServerMessage) message;
+         returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
+      }
+
+      return largeMessageReturn;
    }
 
    private static boolean checkSignature(final ActiveMQBuffer buffer) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
index c63bc9900d..0f1f5c4860 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
@@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends 
AmqpClientTestSupport {
       }
 
       sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), 
largeText.toString(), 10);
+      server.stop();
+      server.start();
       receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 
10);
       if (useDivert) {
          // We diverted, so messages were copied, we need to make sure we 
consume from the original queue
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
index eb6bece11f..f51bd5cd2c 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
@@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends 
SmokeTestBase {
    public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
    public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
 
+   Process server0Process;
    Process server1Process;
 
    @Before
    public void before() throws Exception {
       cleanupData(SERVER_NAME_0);
       cleanupData(SERVER_NAME_1);
-      server1Process = startServer(SERVER_NAME_0, 0, 30000);
-      startServer(SERVER_NAME_1, 100, 30000);
+      server0Process = startServer(SERVER_NAME_0, 0, 30000);
+      server1Process = startServer(SERVER_NAME_1, 100, 30000);
    }
 
    @Test
@@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase 
{
       connection1.close();
       connection2.close();
    }
+
+   @Test
+   public void testKillWhileSendingLargeCORE() throws Exception {
+      testKillWhileSendingLarge("CORE");
+   }
+
+   @Test
+   public void testKillWhileSendingLargeAMQP() throws Exception {
+      testKillWhileSendingLarge("AMQP");
+   }
+
+   public void testKillWhileSendingLarge(String protocol) throws Exception {
+
+      ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61716");
+      Connection keepConsumerConnection = server2CF.createConnection();
+      Session keepConsumerSession = keepConsumerConnection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+      // a consumer that we should keep to induce message redistribution
+      MessageConsumer keepConsumer = 
keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));
+
+      String largeBody;
+      {
+         StringBuffer largeBodyBuffer = new StringBuffer();
+         while (largeBodyBuffer.length() < 1024 * 1024) {
+            largeBodyBuffer.append("This is large ");
+         }
+         largeBody = largeBodyBuffer.toString();
+      }
+
+      int NMESSAGES = 10;
+
+      ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection1 = server1CF.createConnection()) {
+         Session session1 = connection1.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue1 = session1.createQueue("testQueue");
+         MessageProducer producer1 = session1.createProducer(queue1);
+         for (int i = 0; i < NMESSAGES; i++) {
+            TextMessage message = session1.createTextMessage(largeBody);
+            message.setStringProperty("i", Integer.toString(i));
+            producer1.send(message);
+
+            if (i == 5) {
+               session1.commit();
+            }
+         }
+         session1.commit();
+      }
+
+      keepConsumerConnection.close();
+      server1Process.destroyForcibly();
+      server1Process = startServer(SERVER_NAME_1, 100, 0);
+
+      for (int i = 0; i < 100; i++) {
+         // retrying the connection until the server is up
+         try (Connection willbegone = server2CF.createConnection()) {
+            break;
+         } catch (Exception ignored) {
+            Thread.sleep(100);
+         }
+      }
+
+      try (Connection connection2 = server2CF.createConnection()) {
+         Session session2 = connection2.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue2 = session2.createQueue("testQueue");
+         MessageConsumer consumer2 = session2.createConsumer(queue2);
+         connection2.start();
+
+         for (int i = 0; i < NMESSAGES; i++) {
+            TextMessage message = (TextMessage) consumer2.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(largeBody, message.getText());
+         }
+      }
+   }
 }
 

Reply via email to