[ 
https://issues.apache.org/jira/browse/QPID-8210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517377#comment-16517377
 ] 

Keith Wall edited comment on QPID-8210 at 6/19/18 6:00 PM:
-----------------------------------------------------------

Change looks reasonable to me, but I notice that 
{{org.apache.qpid.server.queue.QueueConsumerImpl#_queueConsumerNode}} is 
unsafely published.   It is written by the config thread then observed from the 
IO threads without a memory barrier.  Unfortunately, I think it needs to be 
volatile (or the code redesigned to avoid the cycle between 
{{QueueConsumerImpl}} and the node, which is more than will be desired for 
7.0.6).

 Also the last paragraphs on this Jira's description have been managed.  The 
Jira title should also be updated to reflect the problem, rather than the 
solution.


was (Author: k-wall):
Change looks reasonable to me, but I notice that 
{{org.apache.qpid.server.queue.QueueConsumerImpl#_queueConsumerNode}} is 
unsafely published.   It is written by the config thread then observed from the 
IO threads without a memory barrier.  Unfortunately, I think it needs to be 
volatile (or the code redesigned to avoid the cycle between 
{{QueueConsumerImpl}} and the node, which is more than will be desired for 
7.0.6).

 Also the last paragraphs on this Jira's description have been managed.

> [Broker-J] Set queue consumer node before adding consumer into queue consumer 
> manager
> -------------------------------------------------------------------------------------
>
>                 Key: QPID-8210
>                 URL: https://issues.apache.org/jira/browse/QPID-8210
>             Project: Qpid
>          Issue Type: Bug
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-7.0.3, qpid-java-broker-7.0.2, 
> qpid-java-broker-7.0.0, qpid-java-broker-7.0.1, qpid-java-broker-7.0.4, 
> qpid-java-broker-7.0.5
>            Reporter: Alex Rudyy
>            Priority: Critical
>             Fix For: qpid-java-broker-7.0.6
>
>
> The consumer node is currently set after consumer is added into queue 
> consumer manager. When released message is requeued all consumers in consumer 
> manager are iterated and AbstractQueue#updateSubRequeueEntry is invoke for 
> every of them to update the released entry in consumer queue context if 
> required. The notification of consumer without a consumer node can result in 
> NullPointerException:
> {noformat}
> java.lang.NullPointerException
>     at 
> org.apache.qpid.server.queue.QueueConsumerManagerImpl.setNotified(QueueConsumerManagerImpl.java:151)
>     at 
> org.apache.qpid.server.queue.AbstractQueue.notifyConsumer(AbstractQueue.java:2268)
>     at 
> org.apache.qpid.server.queue.AbstractQueue.updateSubRequeueEntry(AbstractQueue.java:1382)
>     at 
> org.apache.qpid.server.queue.AbstractQueue.resetSubPointers(AbstractQueue.java:1413)
>     at 
> org.apache.qpid.server.queue.AbstractQueue.requeue(AbstractQueue.java:1399)
>     at 
> org.apache.qpid.server.queue.QueueEntryImpl.postRelease(QueueEntryImpl.java:446)
>     at 
> org.apache.qpid.server.queue.QueueEntryImpl.release(QueueEntryImpl.java:437)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQChannel.requeue(AMQChannel.java:896)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQChannel.close(AMQChannel.java:787)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:459)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.closeChannel(AMQPConnection_0_8Impl.java:430)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQChannel.receiveChannelClose(AMQChannel.java:2167)
>     at 
> org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody.process(ChannelCloseBody.java:140)
>     at 
> org.apache.qpid.server.protocol.v0_8.ServerDecoder.processMethod(ServerDecoder.java:132)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQDecoder.processFrame(AMQDecoder.java:203)
>     at 
> org.apache.qpid.server.protocol.v0_8.BrokerDecoder.doProcessFrame(BrokerDecoder.java:141)
>     at 
> org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processFrame(BrokerDecoder.java:65)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQDecoder.processInput(AMQDecoder.java:185)
>     at 
> org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:104)
>     at 
> org.apache.qpid.server.protocol.v0_8.BrokerDecoder$1.run(BrokerDecoder.java:97)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at 
> org.apache.qpid.server.protocol.v0_8.BrokerDecoder.processAMQPFrames(BrokerDecoder.java:96)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQDecoder.decode(AMQDecoder.java:118)
>     at 
> org.apache.qpid.server.protocol.v0_8.ServerDecoder.decodeBuffer(ServerDecoder.java:44)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:257)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl$1.run(AMQPConnection_0_8Impl.java:249)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at 
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8Impl.received(AMQPConnection_0_8Impl.java:248)
>     at 
> org.apache.qpid.server.transport.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:135)
>     at 
> org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData(NonBlockingConnection.java:610)
>     at 
> org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData(NonBlockingConnectionPlainDelegate.java:58)
>     at 
> org.apache.qpid.server.transport.NonBlockingConnection.doRead(NonBlockingConnection.java:496)
>     at 
> org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:270)
>     at 
> org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:134)
>     at 
> org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection(SelectorThread.java:575)
>     at 
> org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect(SelectorThread.java:366)
>     at 
> org.apache.qpid.server.transport.SelectorThread$SelectionTask.run(SelectorThread.java:97)
>     at 
> org.apache.qpid.server.transport.SelectorThread.run(SelectorThread.java:533)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at 
> org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$null$0(QpidByteBufferFactory.java:464)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The issue can happen in next scenarious:
> * when previously acquired message is released due to 
> consumer/session/connection close but another consumer is attached to the
> *  well when transaction is rolled back or message is released (for example, 
> JMS session Session#release() is called  for client_ack session) and a new 
> competing consumer is attached at the same time.
> queue at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to