Awesome, thank you! On Wed, May 26, 2021 at 12:29 PM Robbie Gemmell <robbie.gemm...@gmail.com> wrote:
> The website repo is at https://github.com/apache/activemq-website/ > > On Wed, 26 May 2021 at 11:31, Jonathan Gallimore > <jonathan.gallim...@gmail.com> wrote: > > > > Hi Chris > > > > Thanks so much for taking the time to review and provide feedback. I'm > > definitely happy to have another shot at getting the additional predicate > > marshalled to OpenWire and get the matching working over the broker, and > > can add a test for the scenario you described with 3 brokers, and can dig > > into any regressions that come out of that. > > > > With respect to the documentation update you mention - presumably this is > > for the ActiveMQ website - can you point me at the source code for that? > > I'd be happy to create a patch. > > > > Many thanks! > > > > Jon > > > > On Mon, May 24, 2021 at 6:07 PM Christopher Shannon < > > christopher.l.shan...@gmail.com> wrote: > > > > > I posted a response on the PR in github > > > > > > On Mon, May 24, 2021 at 6:08 AM Jonathan Gallimore < > > > jonathan.gallim...@gmail.com> wrote: > > > > > > > Hi All > > > > > > > > I've filed a JIRA for this, with a potential patch, but opening up a > > > thread > > > > here, in case this is not the right approach, or if this requires > more > > > > discussion. > > > > > > > > I have a network of brokers set up, and use a plugin to set the > > > additional > > > > predicate, to enable message filtering based on the consumer. This > works > > > > great for a single broker, or failover setup, but fails when using a > > > > network of brokers, with the following exception: > > > > > > > > java.lang.ClassCastException: class > > > > org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to > class > > > > org.apache.activemq.command.DataStructure > > > > (org.apache.activemq.filter.ComparisonExpression$1 and > > > > org.apache.activemq.command.DataStructure are in unnamed module of > loader > > > > org.apache.catalina.loader.ParallelWebappClassLoader @749f539e) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87) > > > > at > > > > > > > > > > > > org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226) > > > > at > > > > > > > > > > > > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193) > > > > at > > > > > > > > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335) > > > > at > > > > > > > > > > > > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317) > > > > at > > > > > > > > > > > > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94) > > > > at > > > > > > > > > > > > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) > > > > at > > > > > > > > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > > > > at > > > > > > > > > > > > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474) > > > > at > > > > > > > > > > > > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972) > > > > at > > > > > > > > > > > > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022) > > > > at > > > > > > > > > > > > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) > > > > at > > > > > > > > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) > > > > at > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > > at > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > > > at java.base/java.lang.Thread.run(Thread.java:834) > > > > > > > > > > > > When this happens, messages for this consumer are not forwarded over > the > > > > network. > > > > > > > > My PR resolves this by creating a copy of the ConsumerInfo object > (which > > > > does not copy the additional predicate), and uses the copy of the > > > > ConsumerInfo object to send the advisory message to trigger the > > > > subscription across the network. > > > > > > > > One (potential) drawback here is that the subscription between the > > > brokers > > > > does not filter the messages. > > > > > > > > I did attempt to add this, by making a wrapper that implements both > > > > BooleanExpression and DataStructure, and could be marshalled into > > > OpenWire, > > > > and then checking it in NetworkBridgeFilter. This did work, but > appeared > > > to > > > > introduce a number of regressions across the test suite. I'm very > happy > > > to > > > > take another swing at this approach if it is preferred. > > > > > > > > Many thanks > > > > > > > > Jon > > > > > > > >