This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 69e21a0eb7 ARTEMIS-4163 Fixing openwire race while chunkSend is 
happening
69e21a0eb7 is described below

commit 69e21a0eb7f7d1a73fc1809c13b683a381841e4c
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Feb 9 05:57:50 2023 -0500

    ARTEMIS-4163 Fixing openwire race while chunkSend is happening
---
 .../core/protocol/openwire/OpenWireConnection.java   | 16 ++++++++++------
 .../openwire/OpenWireLargeMessageTest.java           | 20 ++++++++++++++++----
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index bb22cc48d0..8a710c2aa0 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -548,12 +548,16 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          final int bufferSize = bytes.length;
          final int maxChunkSize = 
protocolManager.getOpenwireMaxPacketChunkSize();
 
-         if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
-            chunkSend(bytes, bufferSize, maxChunkSize);
-         } else {
-            final ActiveMQBuffer buffer = 
transportConnection.createTransportBuffer(bufferSize);
-            buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
-            transportConnection.write(buffer, false, false);
+         // We can't let any other packet to sneak in while chunkSend is 
happening.
+         // otherwise we may get wrong packts delivered
+         synchronized (transportConnection) {
+            if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
+               chunkSend(bytes, bufferSize, maxChunkSize);
+            } else {
+               final ActiveMQBuffer buffer = 
transportConnection.createTransportBuffer(bufferSize);
+               buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
+               transportConnection.write(buffer, false, false);
+            }
          }
          bufferSent();
       } catch (IOException e) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
index 02e346ad13..e862703544 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
@@ -20,12 +20,14 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -190,7 +192,7 @@ public class OpenWireLargeMessageTest extends 
BasicOpenWireTest {
 
    @Test
    public void testSendReceiveLargeMessageTX() throws Exception {
-      int NUMBER_OF_MESSAGES = 400;
+      int NUMBER_OF_MESSAGES = 1000;
       int TX_SIZE = 100;
 
       ExecutorService executorService = Executors.newFixedThreadPool(1);
@@ -220,8 +222,11 @@ public class OpenWireLargeMessageTest extends 
BasicOpenWireTest {
             Queue queue = session.createQueue(lmAddress.toString());
             MessageConsumer consumer = session.createConsumer(queue);
             for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
-               TextMessage m = (TextMessage) consumer.receive(5000);
-               assertEquals(largeString, m.getText());
+               Message m = consumer.receive(5000);
+               Assert.assertNotNull(m);
+               if (m instanceof TextMessage) {
+                  assertEquals(largeString, ((TextMessage) m).getText());
+               }
                if (received > 0 && received % TX_SIZE == 0) {
                   logger.info("Received {} messages", received);
                   session.commit();
@@ -247,7 +252,14 @@ public class OpenWireLargeMessageTest extends 
BasicOpenWireTest {
          producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
          for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
-            TextMessage message = session.createTextMessage(largeString);
+            Message message;
+            if (sent % 2 == 0) {
+               message = session.createTextMessage(largeString);
+            }  else {
+               BytesMessage bytesMessage = session.createBytesMessage();
+               
bytesMessage.writeBytes(largeString.getBytes(StandardCharsets.UTF_8));
+               message = bytesMessage;
+            }
             producer.send(message);
             if (sent > 0 && sent % TX_SIZE == 0) {
                logger.info("Sent {} messages", sent);

Reply via email to