Hi All

I've filed a JIRA for this, with a potential patch, but opening up a thread
here, in case this is not the right approach, or if this requires more
discussion.

I have a network of brokers set up, and use a plugin to set the additional
predicate, to enable message filtering based on the consumer. This works
great for a single broker, or failover setup, but fails when using a
network of brokers, with the following exception:

java.lang.ClassCastException: class
org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class
org.apache.activemq.command.DataStructure
(org.apache.activemq.filter.ComparisonExpression$1 and
org.apache.activemq.command.DataStructure are in unnamed module of loader
org.apache.catalina.loader.ParallelWebappClassLoader @749f539e)
        at
org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133)
        at
org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
        at
org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
        at
org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140)
        at
org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76)
        at
org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
        at
org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
        at
org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87)
        at
org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193)
        at
org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335)
        at
org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474)
        at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972)
        at
org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022)
        at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
        at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
        at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)


When this happens, messages for this consumer are not forwarded over the
network.

My PR resolves this by creating a copy of the ConsumerInfo object (which
does not copy the additional predicate), and uses the copy of the
ConsumerInfo object to send the advisory message to trigger the
subscription across the network.

One (potential) drawback here is that the subscription between the brokers
does not filter the messages.

I did attempt to add this, by making a wrapper that implements both
BooleanExpression and DataStructure, and could be marshalled into OpenWire,
and then checking it in NetworkBridgeFilter. This did work, but appeared to
introduce a number of regressions across the test suite. I'm very happy to
take another swing at this approach if it is preferred.

Many thanks

Jon

Reply via email to