The website repo is at https://github.com/apache/activemq-website/

On Wed, 26 May 2021 at 11:31, Jonathan Gallimore
<jonathan.gallim...@gmail.com> wrote:
>
> Hi Chris
>
> Thanks so much for taking the time to review and provide feedback. I'm
> definitely happy to have another shot at getting the additional predicate
> marshalled to OpenWire and get the matching working over the broker, and
> can add a test for the scenario you described with 3 brokers, and can dig
> into any regressions that come out of that.
>
> With respect to the documentation update you mention - presumably this is
> for the ActiveMQ website - can you point me at the source code for that?
> I'd be happy to create a patch.
>
> Many thanks!
>
> Jon
>
> On Mon, May 24, 2021 at 6:07 PM Christopher Shannon <
> christopher.l.shan...@gmail.com> wrote:
>
> > I posted a response on the PR in github
> >
> > On Mon, May 24, 2021 at 6:08 AM Jonathan Gallimore <
> > jonathan.gallim...@gmail.com> wrote:
> >
> > > Hi All
> > >
> > > I've filed a JIRA for this, with a potential patch, but opening up a
> > thread
> > > here, in case this is not the right approach, or if this requires more
> > > discussion.
> > >
> > > I have a network of brokers set up, and use a plugin to set the
> > additional
> > > predicate, to enable message filtering based on the consumer. This works
> > > great for a single broker, or failover setup, but fails when using a
> > > network of brokers, with the following exception:
> > >
> > > java.lang.ClassCastException: class
> > > org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class
> > > org.apache.activemq.command.DataStructure
> > > (org.apache.activemq.filter.ComparisonExpression$1 and
> > > org.apache.activemq.command.DataStructure are in unnamed module of loader
> > > org.apache.catalina.loader.ParallelWebappClassLoader @749f539e)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87)
> > >         at
> > >
> > >
> > org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226)
> > >         at
> > >
> > >
> > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193)
> > >         at
> > >
> > >
> > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335)
> > >         at
> > >
> > >
> > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317)
> > >         at
> > >
> > >
> > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94)
> > >         at
> > >
> > >
> > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116)
> > >         at
> > >
> > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> > >         at
> > >
> > >
> > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474)
> > >         at
> > >
> > >
> > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972)
> > >         at
> > >
> > >
> > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022)
> > >         at
> > >
> > >
> > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
> > >         at
> > >
> > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
> > >         at
> > >
> > >
> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > >         at
> > >
> > >
> > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > >         at java.base/java.lang.Thread.run(Thread.java:834)
> > >
> > >
> > > When this happens, messages for this consumer are not forwarded over the
> > > network.
> > >
> > > My PR resolves this by creating a copy of the ConsumerInfo object (which
> > > does not copy the additional predicate), and uses the copy of the
> > > ConsumerInfo object to send the advisory message to trigger the
> > > subscription across the network.
> > >
> > > One (potential) drawback here is that the subscription between the
> > brokers
> > > does not filter the messages.
> > >
> > > I did attempt to add this, by making a wrapper that implements both
> > > BooleanExpression and DataStructure, and could be marshalled into
> > OpenWire,
> > > and then checking it in NetworkBridgeFilter. This did work, but appeared
> > to
> > > introduce a number of regressions across the test suite. I'm very happy
> > to
> > > take another swing at this approach if it is preferred.
> > >
> > > Many thanks
> > >
> > > Jon
> > >
> >

Reply via email to