Hi Chris Thanks so much for taking the time to review and provide feedback. I'm definitely happy to have another shot at getting the additional predicate marshalled to OpenWire and get the matching working over the broker, and can add a test for the scenario you described with 3 brokers, and can dig into any regressions that come out of that.
With respect to the documentation update you mention - presumably this is for the ActiveMQ website - can you point me at the source code for that? I'd be happy to create a patch. Many thanks! Jon On Mon, May 24, 2021 at 6:07 PM Christopher Shannon < christopher.l.shan...@gmail.com> wrote: > I posted a response on the PR in github > > On Mon, May 24, 2021 at 6:08 AM Jonathan Gallimore < > jonathan.gallim...@gmail.com> wrote: > > > Hi All > > > > I've filed a JIRA for this, with a potential patch, but opening up a > thread > > here, in case this is not the right approach, or if this requires more > > discussion. > > > > I have a network of brokers set up, and use a plugin to set the > additional > > predicate, to enable message filtering based on the consumer. This works > > great for a single broker, or failover setup, but fails when using a > > network of brokers, with the following exception: > > > > java.lang.ClassCastException: class > > org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class > > org.apache.activemq.command.DataStructure > > (org.apache.activemq.filter.ComparisonExpression$1 and > > org.apache.activemq.command.DataStructure are in unnamed module of loader > > org.apache.catalina.loader.ParallelWebappClassLoader @749f539e) > > at > > > > > org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133) > > at > > > > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > > at > > > > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > > at > > > > > org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140) > > at > > > > > org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76) > > at > > > > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > > at > > > > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > > at > > > > > org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87) > > at > > > > > org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226) > > at > > > > > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193) > > at > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335) > > at > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317) > > at > > > > > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94) > > at > > > > > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) > > at > > > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > > at > > > > > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474) > > at > > > > > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972) > > at > > > > > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022) > > at > > > > > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) > > at > > > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:834) > > > > > > When this happens, messages for this consumer are not forwarded over the > > network. > > > > My PR resolves this by creating a copy of the ConsumerInfo object (which > > does not copy the additional predicate), and uses the copy of the > > ConsumerInfo object to send the advisory message to trigger the > > subscription across the network. > > > > One (potential) drawback here is that the subscription between the > brokers > > does not filter the messages. > > > > I did attempt to add this, by making a wrapper that implements both > > BooleanExpression and DataStructure, and could be marshalled into > OpenWire, > > and then checking it in NetworkBridgeFilter. This did work, but appeared > to > > introduce a number of regressions across the test suite. I'm very happy > to > > take another swing at this approach if it is preferred. > > > > Many thanks > > > > Jon > > >