[ 
https://issues.apache.org/jira/browse/AMQ-8277?focusedWorklogId=601092&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-601092
 ]

ASF GitHub Bot logged work on AMQ-8277:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/21 10:03
            Start Date: 24/May/21 10:03
    Worklog Time Spent: 10m 
      Work Description: jgallimore opened a new pull request #656:
URL: https://github.com/apache/activemq/pull/656


   … sending the advisory message to trigger the subscription over the network 
of brokers.
   
   If a consumer uses an additionalPredicate (as part of a plugin, for example 
to filter messages to the consumer), the additionalPredicate will cause a 
ClassCastException and messages will not be forwarded across the network for 
this consumer.
   
   This PR creates a copy of the ConsumerInfo object (which does *not* copy the 
additional predicate), and uses the copy of the ConsumerInfo object to send the 
advisory message to trigger the subscription across the network.
   
   One (potential) drawback here is that the subscription between the brokers 
is does not filter the messages. 
   
   I did attempt to add this, by making a wrapper that implements both 
BooleanExpression and DataStructure, and could be marshalled into OpenWire, and 
then checking it in NetworkBridgeFilter. This did work, but appeared to 
introduce a number of regressions across the test suite. I'm very happy to take 
a another swing at this approach if it is preferred.
   
   I have run the full test suite across this PR, and it doesn't appear to 
break any other tests on my setup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 601092)
    Remaining Estimate: 0h
            Time Spent: 10m

> Subscriber with additional predicate causes ClassCastException and doesn't 
> forward messages between brokers
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-8277
>                 URL: https://issues.apache.org/jira/browse/AMQ-8277
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.16.2
>            Reporter: Jonathan Gallimore
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I have a network of brokers set up, and use a plugin to set the additional 
> predicate, to enable message filtering based on the consumer.
> (JavaDoc for MessageConsumer.setAdditionalPredicate() is "A transient 
> additional predicate that can be used it inject additional predicates into 
> the selector on the fly. Handy if if say a Security Broker interceptor wants 
> to filter out messages based on security level of the consumer.")
> This works fine for a single broker, or in a failover scenario, but if used 
> with a network of brokers, whenever the consumer starts, the following 
> exception is thrown, and messages are not forwarded across the network for 
> this consumer.
>  
> java.lang.ClassCastException: class 
> org.apache.activemq.filter.ComparisonExpression$1 cannot be cast to class 
> org.apache.activemq.command.DataStructure 
> (org.apache.activemq.filter.ComparisonExpression$1 and 
> org.apache.activemq.command.DataStructure are in unnamed module of loader 
> org.apache.catalina.loader.ParallelWebappClassLoader @749f539e)
>         at 
> org.apache.activemq.openwire.v12.ConsumerInfoMarshaller.tightMarshal1(ConsumerInfoMarshaller.java:133)
>         at 
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
>         at 
> org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
>         at 
> org.apache.activemq.openwire.v12.MessageMarshaller.tightMarshal1(MessageMarshaller.java:140)
>         at 
> org.apache.activemq.openwire.v12.ActiveMQMessageMarshaller.tightMarshal1(ActiveMQMessageMarshaller.java:76)
>         at 
> org.apache.activemq.openwire.OpenWireFormat.tightMarshalNestedObject1(OpenWireFormat.java:400)
>         at 
> org.apache.activemq.openwire.v12.BaseDataStreamMarshaller.tightMarshalNestedObject1(BaseDataStreamMarshaller.java:130)
>         at 
> org.apache.activemq.openwire.v12.MessageDispatchMarshaller.tightMarshal1(MessageDispatchMarshaller.java:87)
>         at 
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:226)
>         at 
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:193)
>         at 
> org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:335)
>         at 
> org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:317)
>         at 
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:94)
>         at 
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116)
>         at 
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
>         at 
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1474)
>         at 
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:972)
>         at 
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1022)
>         at 
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
>         at 
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)
>  
> This is very similar to the exception noted in this query: 
> [http://activemq.2283324.n4.nabble.com/Network-broker-and-filter-predicate-td2367038.html]
>  
> This appears to be down to the *Expression objects not implementing 
> DataStructure, and therefore not being marshalable to OpenWire format.
> I have prepared a simple patch with a unit test which I'll provide as a PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to