I posted a response on the PR in github

On Mon, May 24, 2021 at 6:08 AM Jonathan Gallimore <
jonathan.gallim...@gmail.com> wrote:

> Hi All
>
> I've filed a JIRA for this, with a potential patch, but opening up a thread
> here, in case this is not the right approach, or if this requires more
> discussion.
>
> I have a network of brokers set up, and use a plugin to set the additional
> predicate, to enable message filtering based on the consumer. This works
> great for a single broker, or failover setup, but fails when using a
> network of brokers, with the following exception:
>
> java.lang.ClassCastException: class
> org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class
> org.apache.activemq.command.DataStructure
> (org.apache.activemq.filter.ComparisonExpression$1 and
> org.apache.activemq.command.DataStructure are in unnamed module of loader
> org.apache.catalina.loader.ParallelWebappClassLoader @749f539e)
>         at
>
> org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133)
>         at
>
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
>         at
>
> org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
>         at
>
> org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140)
>         at
>
> org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76)
>         at
>
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
>         at
>
> org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
>         at
>
> org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87)
>         at
>
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226)
>         at
>
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193)
>         at
>
> org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335)
>         at
>
> org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317)
>         at
>
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94)
>         at
>
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116)
>         at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>         at
>
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474)
>         at
>
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972)
>         at
>
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022)
>         at
>
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
>         at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
>         at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> When this happens, messages for this consumer are not forwarded over the
> network.
>
> My PR resolves this by creating a copy of the ConsumerInfo object (which
> does not copy the additional predicate), and uses the copy of the
> ConsumerInfo object to send the advisory message to trigger the
> subscription across the network.
>
> One (potential) drawback here is that the subscription between the brokers
> does not filter the messages.
>
> I did attempt to add this, by making a wrapper that implements both
> BooleanExpression and DataStructure, and could be marshalled into OpenWire,
> and then checking it in NetworkBridgeFilter. This did work, but appeared to
> introduce a number of regressions across the test suite. I'm very happy to
> take another swing at this approach if it is preferred.
>
> Many thanks
>
> Jon
>

Reply via email to