[
https://issues.apache.org/jira/browse/AMQ-9855?focusedWorklogId=1004643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-1004643
]
ASF GitHub Bot logged work on AMQ-9855:
---------------------------------------
Author: ASF GitHub Bot
Created on: 11/Feb/26 16:35
Start Date: 11/Feb/26 16:35
Worklog Time Spent: 10m
Work Description: jeanouii commented on code in PR #1659:
URL: https://github.com/apache/activemq/pull/1659#discussion_r2794236552
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -20,13 +20,16 @@
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
Review Comment:
This is discouraged in the project, even though it's not a blocker
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -94,6 +97,30 @@ public void oneway(Object command) throws IOException {
throw new TransportDisposedIOException("Peer (" +
peer.toString() + ") disposed.");
}
+ // Deep copy the message if it is a MessageDispatch
+ Object toSend = command;
+ if (command instanceof MessageDispatch) {
+ MessageDispatch original = (MessageDispatch) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = wf.unmarshal(data); // deep copy
+ } catch (IOException e) {
+ LOG.warn("Failed to deep copy MessageDispatch, sending
original", e);
+ toSend = command;
+ }
+ } else if (command instanceof ActiveMQMessage) {
+ ActiveMQMessage original = (ActiveMQMessage) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = (ActiveMQMessage) wf.unmarshal(data);
+ } catch (IOException e) {
+ LOG.warn("Failed to marshal/unmarshal ActiveMQMessage,
sending original", e);
Review Comment:
Same
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -124,6 +151,10 @@ public void oneway(Object command) throws IOException {
return;
}
}
+
+ // Dispatch to listener
+ dispatch(peer, peer.messageQueue, toSend);
Review Comment:
Should we have a return after this one to avoid the second dispatch bellow?
##########
activemq-unit-tests/src/test/java/org/apache/activemq/VmTransportBrokerRestartTest.java:
##########
Review Comment:
This PR should be rebased against apache/main to avoid this class to be here
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -94,6 +97,30 @@ public void oneway(Object command) throws IOException {
throw new TransportDisposedIOException("Peer (" +
peer.toString() + ") disposed.");
}
+ // Deep copy the message if it is a MessageDispatch
+ Object toSend = command;
+ if (command instanceof MessageDispatch) {
+ MessageDispatch original = (MessageDispatch) command;
+ try {
+ WireFormat wf = new OpenWireFormat();
+ ByteSequence data = wf.marshal(original);
+ toSend = wf.unmarshal(data); // deep copy
+ } catch (IOException e) {
+ LOG.warn("Failed to deep copy MessageDispatch, sending
original", e);
+ toSend = command;
Review Comment:
I'm wondering if it's desire or not to be honest. The goal to me of this PR
(and it's great) is to have VM transport behave the same as other remote
transport. Benefit being that others in the same JVM can't mutate the message.
Great!
Now if we can't serialize/de-serialize to create a deep copy and we still
send the original, we might introduce a case where VM does work when remote
does not. So I'm tempted to just fail here. What do you think?
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -68,6 +72,9 @@ public class VMTransport implements Transport, Task {
private volatile int receiveCounter;
+ private final List<TransportListener> listeners = new
CopyOnWriteArrayList<>();
+ private final ExecutorService executor = Executors.newCachedThreadPool();
Review Comment:
I'm probably blind or my search/replace does not work properly. Where are
the 2 fields used so far?
##########
activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java:
##########
@@ -20,13 +20,16 @@
import java.io.InterruptedIOException;
import java.net.URI;
import java.security.cert.X509Certificate;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.activemq.command.ShutdownInfo;
+import jakarta.jms.JMSException;
+import org.apache.activemq.command.*;
Review Comment:
Same here FYI
Issue Time Tracking
-------------------
Worklog Id: (was: 1004643)
Time Spent: 2h 10m (was: 2h)
> Intermittent null/empty body when consuming from a topic (vm:// transport)
> --------------------------------------------------------------------------
>
> Key: AMQ-9855
> URL: https://issues.apache.org/jira/browse/AMQ-9855
> Project: ActiveMQ
> Issue Type: Bug
> Components: AMQP, Camel
> Affects Versions: 6.2.0, 6.1.2, 6.1.6, 6.1.7
> Reporter: JJ
> Priority: Major
> Fix For: 6.3.0
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Also see AMQ-6708 This is very much the same issue but with more details. The
> op on that ticket hasn't been seen since 2017.
> We have a simple AMQ instance using Camel; It connects to an upstream remote
> server via OpenWire and subscribes to topics. It Bridges those topics to the
> local AMQ with some later Camel processing.
> The route looks like this:
> <route id="Route_SPLITTER">
> <from uri="remoteServer:topic:TOPIC_A?durableSubscriptionName=some.user"/>
> <choice>
> <when>
> <simple>${body} == null || ${body} == ''</simple>
> <log message="Received message with missing body:
> ${header.CamelMessageHistory}"/>
> </when>
> <otherwise>
> </otherwise>
> </choice>
>
> <to uri="localAMQ:topic:MY_TOPIC_A"/>
> <split streaming="true" >
> <method ref="Splitter" method="processMessage"/>
> <multicast>
> <to uri="direct:routeSorter"/>
> </multicast>
> </split>
> </route>
>
> Logging was added to make sure it wasn't an upstream issue (and it's not)
>
> The data being passed is formatted as arrays of JSON. The <to
> uri="localAMQ:topic:MY_TOPIC_A"/> just passes it untouched. The Splitter send
> a copy elsewhere to be filtered by an order number prefix.
> The internal Camel to AMQ connection is via the vm:// transport using
> org.apache.camel.component.activemq.ActiveMQComponent (but I have also tried
> a pooled JMS connection factory with the same results)
> When I connect a test non durable consumer from a Ruby script using STOMP, or
> NIO I see the same issue. Some messages appear to have a 0 sized body.
> I can connect an c++ open wire consumer from the same server and that
> instance gets all messages with no 0 size bodies.
> I have tried various versions of Camel and all exhibit the same results.
> It;'s also worth noting that the data sent to the splitter function reports
> no errors either.
> I have also tried some of the older STOPM GEM packages but no change. (Though
> I have found some odd connection issue when you upgrade to io-wait-0.4.0 from
> 0.3.1
>
> After much swapping things round and testing I've finally narrowed it down to
> some issue with the vm:// transport...
> I have swapped the internal Camel connection from using vm:// to tcp:// and
> for the last 24hrs have seen no client errors with 0 sized bodies.
> I don't have any way to debug this deeper but hopefully someone else will
> pick this up.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact