This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 69e21a0eb7 ARTEMIS-4163 Fixing openwire race while chunkSend is
happening
69e21a0eb7 is described below
commit 69e21a0eb7f7d1a73fc1809c13b683a381841e4c
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Feb 9 05:57:50 2023 -0500
ARTEMIS-4163 Fixing openwire race while chunkSend is happening
---
.../core/protocol/openwire/OpenWireConnection.java | 16 ++++++++++------
.../openwire/OpenWireLargeMessageTest.java | 20 ++++++++++++++++----
2 files changed, 26 insertions(+), 10 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index bb22cc48d0..8a710c2aa0 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -548,12 +548,16 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
final int bufferSize = bytes.length;
final int maxChunkSize =
protocolManager.getOpenwireMaxPacketChunkSize();
- if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
- chunkSend(bytes, bufferSize, maxChunkSize);
- } else {
- final ActiveMQBuffer buffer =
transportConnection.createTransportBuffer(bufferSize);
- buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
- transportConnection.write(buffer, false, false);
+ // We can't let any other packet to sneak in while chunkSend is
happening.
+ // otherwise we may get wrong packts delivered
+ synchronized (transportConnection) {
+ if (maxChunkSize > 0 && bufferSize > maxChunkSize) {
+ chunkSend(bytes, bufferSize, maxChunkSize);
+ } else {
+ final ActiveMQBuffer buffer =
transportConnection.createTransportBuffer(bufferSize);
+ buffer.writeBytes(bytes.data, bytes.offset, bufferSize);
+ transportConnection.write(buffer, false, false);
+ }
}
bufferSent();
} catch (IOException e) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
index 02e346ad13..e862703544 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.java
@@ -20,12 +20,14 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -190,7 +192,7 @@ public class OpenWireLargeMessageTest extends
BasicOpenWireTest {
@Test
public void testSendReceiveLargeMessageTX() throws Exception {
- int NUMBER_OF_MESSAGES = 400;
+ int NUMBER_OF_MESSAGES = 1000;
int TX_SIZE = 100;
ExecutorService executorService = Executors.newFixedThreadPool(1);
@@ -220,8 +222,11 @@ public class OpenWireLargeMessageTest extends
BasicOpenWireTest {
Queue queue = session.createQueue(lmAddress.toString());
MessageConsumer consumer = session.createConsumer(queue);
for (int received = 0; received < NUMBER_OF_MESSAGES; received++) {
- TextMessage m = (TextMessage) consumer.receive(5000);
- assertEquals(largeString, m.getText());
+ Message m = consumer.receive(5000);
+ Assert.assertNotNull(m);
+ if (m instanceof TextMessage) {
+ assertEquals(largeString, ((TextMessage) m).getText());
+ }
if (received > 0 && received % TX_SIZE == 0) {
logger.info("Received {} messages", received);
session.commit();
@@ -247,7 +252,14 @@ public class OpenWireLargeMessageTest extends
BasicOpenWireTest {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int sent = 0; sent < NUMBER_OF_MESSAGES; sent++) {
- TextMessage message = session.createTextMessage(largeString);
+ Message message;
+ if (sent % 2 == 0) {
+ message = session.createTextMessage(largeString);
+ } else {
+ BytesMessage bytesMessage = session.createBytesMessage();
+
bytesMessage.writeBytes(largeString.getBytes(StandardCharsets.UTF_8));
+ message = bytesMessage;
+ }
producer.send(message);
if (sent > 0 && sent % TX_SIZE == 0) {
logger.info("Sent {} messages", sent);