I posted a response on the PR in github On Mon, May 24, 2021 at 6:08 AM Jonathan Gallimore < jonathan.gallim...@gmail.com> wrote:
> Hi All > > I've filed a JIRA for this, with a potential patch, but opening up a thread > here, in case this is not the right approach, or if this requires more > discussion. > > I have a network of brokers set up, and use a plugin to set the additional > predicate, to enable message filtering based on the consumer. This works > great for a single broker, or failover setup, but fails when using a > network of brokers, with the following exception: > > java.lang.ClassCastException: class > org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class > org.apache.activemq.command.DataStructure > (org.apache.activemq.filter.ComparisonExpression$1 and > org.apache.activemq.command.DataStructure are in unnamed module of loader > org.apache.catalina.loader.ParallelWebappClassLoader @749f539e) > at > > org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133) > at > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > at > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > at > > org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140) > at > > org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76) > at > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > at > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > at > > org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87) > at > > org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226) > at > > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193) > at > > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335) > at > > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317) > at > > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94) > at > > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > at > > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474) > at > > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972) > at > > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022) > at > > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) > at > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) > at > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > > > When this happens, messages for this consumer are not forwarded over the > network. > > My PR resolves this by creating a copy of the ConsumerInfo object (which > does not copy the additional predicate), and uses the copy of the > ConsumerInfo object to send the advisory message to trigger the > subscription across the network. > > One (potential) drawback here is that the subscription between the brokers > does not filter the messages. > > I did attempt to add this, by making a wrapper that implements both > BooleanExpression and DataStructure, and could be marshalled into OpenWire, > and then checking it in NetworkBridgeFilter. This did work, but appeared to > introduce a number of regressions across the test suite. I'm very happy to > take another swing at this approach if it is preferred. > > Many thanks > > Jon >