[ 
https://issues.apache.org/jira/browse/AMQ-9855?focusedWorklogId=1005147&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-1005147
 ]

ASF GitHub Bot logged work on AMQ-9855:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Feb/26 22:05
            Start Date: 13/Feb/26 22:05
    Worklog Time Spent: 10m 
      Work Description: cshannon commented on code in PR #1659:
URL: https://github.com/apache/activemq/pull/1659#discussion_r2806195369


##########
activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java:
##########
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.TextMessage;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertEquals;
+
+public class ActiveMQTextMessageStressTest {
+
+    private BrokerService broker;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.setClientID("HIGH_CONC_TEST"); // needed for durable 
subscribers
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) connection.close();
+        if (broker != null) broker.stop();
+    }
+
+    @Test
+    public void testConcurrentProducersAndConsumers() throws Exception {
+        final int MESSAGE_COUNT = 100;
+        final int PRODUCERS = 5;
+        final int DURABLE_CONSUMERS = 0;
+        final int NON_DURABLE_CONSUMERS = 5;
+
+        // Topic
+        Session tmpSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
+
+        // Consumers
+        List<MessageConsumer> consumers = new ArrayList<>();
+        List<Session> consumerSessions = new ArrayList<>();
+        for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
+            consumerSessions.add(s);
+        }
+        for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createConsumer(topic));
+            consumerSessions.add(s);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + 
consumers.size());
+
+        // Produce messages concurrently
+        CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
+        for (int p = 1; p <= PRODUCERS; p++) {
+            final int producerId = p;
+            executor.submit(() -> {
+                try {
+                    Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = s.createProducer(topic);
+                    for (int m = 1; m <= MESSAGE_COUNT; m++) {
+                        TextMessage msg = s.createTextMessage("P" + producerId 
+ "-M" + m);
+                        producer.send(msg);
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();

Review Comment:
   I wouldn't use print stacktrace here, the exception should be handled 
better. We should either fail the test or in this case maybe just log it at 
debug level if we don't care.



##########
activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java:
##########
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.TextMessage;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertEquals;
+
+public class ActiveMQTextMessageStressTest {
+
+    private BrokerService broker;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.setClientID("HIGH_CONC_TEST"); // needed for durable 
subscribers
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) connection.close();
+        if (broker != null) broker.stop();
+    }
+
+    @Test
+    public void testConcurrentProducersAndConsumers() throws Exception {
+        final int MESSAGE_COUNT = 100;
+        final int PRODUCERS = 5;
+        final int DURABLE_CONSUMERS = 0;
+        final int NON_DURABLE_CONSUMERS = 5;
+
+        // Topic
+        Session tmpSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
+
+        // Consumers
+        List<MessageConsumer> consumers = new ArrayList<>();
+        List<Session> consumerSessions = new ArrayList<>();
+        for (int i = 1; i <= DURABLE_CONSUMERS; i++) {

Review Comment:
   DURABLE_CONSUMERS is always 0, so this should either be changed or removed.



##########
activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java:
##########
@@ -54,9 +54,11 @@ public Message copy() {
         return copy;
     }
 
-    private void copy(ActiveMQTextMessage copy) {
-        super.copy(copy);
-        copy.text = text;
+    protected void copy(ActiveMQTextMessage copy) {

Review Comment:
   I agree, leaving it private here makes sense unless we need to change it (we 
can always change it later)



##########
activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java:
##########
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.TextMessage;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertEquals;
+
+public class ActiveMQTextMessageStressTest {
+
+    private BrokerService broker;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.setClientID("HIGH_CONC_TEST"); // needed for durable 
subscribers
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) connection.close();
+        if (broker != null) broker.stop();
+    }
+
+    @Test
+    public void testConcurrentProducersAndConsumers() throws Exception {
+        final int MESSAGE_COUNT = 100;
+        final int PRODUCERS = 5;
+        final int DURABLE_CONSUMERS = 0;
+        final int NON_DURABLE_CONSUMERS = 5;
+
+        // Topic
+        Session tmpSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
+
+        // Consumers
+        List<MessageConsumer> consumers = new ArrayList<>();
+        List<Session> consumerSessions = new ArrayList<>();
+        for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
+            consumerSessions.add(s);
+        }
+        for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createConsumer(topic));
+            consumerSessions.add(s);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + 
consumers.size());
+
+        // Produce messages concurrently
+        CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
+        for (int p = 1; p <= PRODUCERS; p++) {
+            final int producerId = p;
+            executor.submit(() -> {
+                try {
+                    Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = s.createProducer(topic);
+                    for (int m = 1; m <= MESSAGE_COUNT; m++) {
+                        TextMessage msg = s.createTextMessage("P" + producerId 
+ "-M" + m);
+                        producer.send(msg);
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    producerLatch.countDown();
+                }
+            });
+        }
+
+// Consume messages concurrently
+        List<Future<List<TextMessage>>> consumerFutures = new ArrayList<>();
+        for (MessageConsumer consumer : consumers) {
+            consumerFutures.add(executor.submit(() -> {
+                List<TextMessage> received = new ArrayList<>();
+                for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
+                    TextMessage msg = (TextMessage) consumer.receive(5000);
+                    assertNotNull("Consumer should receive a message", msg);
+
+                    // loop to increase the chance of hitting the race 
condition
+                    // while other consumers are doing the same to their 
copies.
+                    for (int j = 0; j < 50; j++) {
+                        String txt = msg.getText();
+                        assertNotNull("Text should never be null during 
stress", txt);
+
+                        // It clears the 'text' field and forces the next 
getText()
+                        
((org.apache.activemq.command.ActiveMQTextMessage)msg).clearUnMarshalledState();
+                    }
+
+                    received.add(msg);
+                }
+                return received;
+            }));
+        }
+
+// Wait for producers and consumers

Review Comment:
   Indentation here is off and needs to be fixed



##########
activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java:
##########
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.TextMessage;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertEquals;
+
+public class ActiveMQTextMessageStressTest {
+
+    private BrokerService broker;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.setClientID("HIGH_CONC_TEST"); // needed for durable 
subscribers
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) connection.close();

Review Comment:
   Nitpicking but you should use braces even for one line statements.



##########
activemq-unit-tests/src/test/java/org/apache/activemq/command/ActiveMQTextMessageStressTest.java:
##########
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import jakarta.jms.Connection;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+import jakarta.jms.Topic;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.TextMessage;
+import jakarta.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertEquals;
+
+public class ActiveMQTextMessageStressTest {
+
+    private BrokerService broker;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.addConnector("vm://localhost");
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm://localhost");
+        connection = cf.createConnection();
+        connection.setClientID("HIGH_CONC_TEST"); // needed for durable 
subscribers
+        connection.start();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) connection.close();
+        if (broker != null) broker.stop();
+    }
+
+    @Test
+    public void testConcurrentProducersAndConsumers() throws Exception {
+        final int MESSAGE_COUNT = 100;
+        final int PRODUCERS = 5;
+        final int DURABLE_CONSUMERS = 0;
+        final int NON_DURABLE_CONSUMERS = 5;
+
+        // Topic
+        Session tmpSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
+
+        // Consumers
+        List<MessageConsumer> consumers = new ArrayList<>();
+        List<Session> consumerSessions = new ArrayList<>();
+        for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
+            consumerSessions.add(s);
+        }
+        for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
+            Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            consumers.add(s.createConsumer(topic));
+            consumerSessions.add(s);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + 
consumers.size());
+
+        // Produce messages concurrently
+        CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
+        for (int p = 1; p <= PRODUCERS; p++) {
+            final int producerId = p;
+            executor.submit(() -> {
+                try {
+                    Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = s.createProducer(topic);
+                    for (int m = 1; m <= MESSAGE_COUNT; m++) {
+                        TextMessage msg = s.createTextMessage("P" + producerId 
+ "-M" + m);
+                        producer.send(msg);
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                } finally {
+                    producerLatch.countDown();
+                }
+            });
+        }
+
+// Consume messages concurrently
+        List<Future<List<TextMessage>>> consumerFutures = new ArrayList<>();
+        for (MessageConsumer consumer : consumers) {
+            consumerFutures.add(executor.submit(() -> {
+                List<TextMessage> received = new ArrayList<>();
+                for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
+                    TextMessage msg = (TextMessage) consumer.receive(5000);
+                    assertNotNull("Consumer should receive a message", msg);
+
+                    // loop to increase the chance of hitting the race 
condition
+                    // while other consumers are doing the same to their 
copies.
+                    for (int j = 0; j < 50; j++) {
+                        String txt = msg.getText();
+                        assertNotNull("Text should never be null during 
stress", txt);
+
+                        // It clears the 'text' field and forces the next 
getText()
+                        
((org.apache.activemq.command.ActiveMQTextMessage)msg).clearUnMarshalledState();
+                    }
+
+                    received.add(msg);
+                }
+                return received;
+            }));
+        }
+
+// Wait for producers and consumers
+        producerLatch.await();
+        List<List<TextMessage>> allConsumed = new ArrayList<>();
+        for (Future<List<TextMessage>> f : consumerFutures) {
+            allConsumed.add(f.get(30, TimeUnit.SECONDS));
+        }
+
+// VALIDATION LOGIC

Review Comment:
   Indentation here is off and needs to be fixed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 1005147)
    Time Spent: 4h 40m  (was: 4.5h)

> Intermittent null/empty body when consuming from a topic (vm:// transport)
> --------------------------------------------------------------------------
>
>                 Key: AMQ-9855
>                 URL: https://issues.apache.org/jira/browse/AMQ-9855
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: AMQP, Camel
>    Affects Versions: 6.2.0, 6.1.2, 6.1.6, 6.1.7
>            Reporter: JJ
>            Priority: Major
>             Fix For: 6.3.0
>
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Also see AMQ-6708 This is very much the same issue but with more details. The 
> op on that ticket hasn't been seen since 2017.
> We have a simple AMQ instance using Camel; It connects to an upstream remote 
> server via OpenWire and subscribes to topics. It Bridges those topics to the 
> local AMQ with some later Camel processing.
> The route looks like this:
> <route id="Route_SPLITTER">
>     <from uri="remoteServer:topic:TOPIC_A?durableSubscriptionName=some.user"/>
>     <choice>
>         <when>
>             <simple>${body} == null || ${body} == ''</simple>
>             <log message="Received message with missing body: 
> ${header.CamelMessageHistory}"/>
>         </when>
>         <otherwise>
>         </otherwise>
>     </choice>
>         
>     <to uri="localAMQ:topic:MY_TOPIC_A"/>
>     <split streaming="true" >
>         <method ref="Splitter" method="processMessage"/>
>         <multicast>
>             <to uri="direct:routeSorter"/>
>         </multicast>    
>     </split>
> </route>
>  
> Logging was added to make sure it wasn't an upstream issue (and it's not)
>  
> The data being passed is formatted as arrays of JSON. The <to 
> uri="localAMQ:topic:MY_TOPIC_A"/> just passes it untouched. The Splitter send 
> a copy elsewhere to be filtered by an order number prefix.
> The internal Camel to AMQ connection is via the vm:// transport using 
> org.apache.camel.component.activemq.ActiveMQComponent (but I have also tried 
> a pooled JMS connection factory with the same results)
> When I connect a test non durable consumer from a Ruby script using STOMP, or 
> NIO I see the same issue. Some messages appear to have a 0 sized body.
> I can connect an c++ open wire consumer from the same server and that 
> instance gets all messages with no 0 size bodies.
> I have tried various versions of Camel and all exhibit the same results. 
> It;'s also worth noting that the data sent to the splitter function reports 
> no errors either.
> I have also tried some of the older STOPM GEM packages but no change. (Though 
> I have found some odd connection issue when you upgrade to io-wait-0.4.0 from 
> 0.3.1
>  
> After much swapping things round and testing I've finally narrowed it down to 
> some issue with the vm:// transport...
> I have swapped the internal Camel connection from using vm:// to tcp:// and 
> for the last 24hrs have seen no client errors with 0 sized bodies. 
> I don't have any way to debug this deeper but hopefully someone else will 
> pick this up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to