[
https://issues.apache.org/jira/browse/QPID-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006083#comment-16006083
]
Rob Godfrey commented on QPID-7634:
-----------------------------------
I don't believe this change is correct.
The change seems to make the assumption that the flow(drain=true) should be
treated like a synchronous request/response action… when it really should be an
asynchronous/state modification thing, where the broker eventually tells the
client when its state has changed. This causes an issue where a drain is
issued with a remaining link credit of X, and there are at least X messages
available to be sent on the link, but the session does not have sufficient
session credit to transfer all those messages. In this case the broker should
send as many transfers as it currently can and then wait until it receives more
session credit. Only if the queue becomes empty should the delivery count
actually be advanced.
An interesting question is whether the "issue" identified is actually an issue
at all. The client issues a drain on a link with zero credit... which means
that the delivery count is already advanced. In this case I do not believe it
should be necessary for the server to echo a drain back, however the wording of
the spec does not seem to take this case into account... so perhaps we should
simply treat this case as an "echo" request.
> [Java Broker,amqp 1.0] Broker does not respond to Flow command with
> drain=true if queue is empty and prefetch is 0
> ------------------------------------------------------------------------------------------------------------------
>
> Key: QPID-7634
> URL: https://issues.apache.org/jira/browse/QPID-7634
> Project: Qpid
> Issue Type: Bug
> Components: Java Broker
> Affects Versions: qpid-java-6.1, qpid-java-6.1.1, qpid-java-broker-7.0.0
> Reporter: Alex Rudyy
> Fix For: qpid-java-broker-7.0.0
>
>
> When AMQP 1.0 client sends the Flow command with drain=true and queue is
> empty the Broker does not respond if prefetch is 0.
> The following snippet reproduces the issue
> {code}
> Connection connection = factory.createConnection(username, password);
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> MessageConsumer messageConsumer = session.createConsumer(queue);
> TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait();
> // <-- times out
> {code}
> The broker logs
> {noformat}
> 2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> RECV[/127.0.0.1:51108|1] :
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
> amqp:rejected:list, amqp:released:list,
> amqp:modified:list],capabilities=[queue, global]},target=Target{}}
> 2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue'
> with arguments
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
> ],
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> optionSet=[ACQUIRES, SEES_REQUEUES]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing
> default: ALLOWED
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]'] performed successfully with result: null
> 2017-01-24 11:05:56,476 INFO [VirtualHostNode-default-Config] (q.m.s.create)
> - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)]
> SUB-1001 : Create
> 2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config]
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with
> arguments
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
> ],
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result:
> Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2,
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
> type=Consumer]
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> SEND[/127.0.0.1:51108|1] :
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
> amqp:rejected:list, amqp:released:list,
> amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) -
> RECV[/127.0.0.1:51108|1] :
> Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108]
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> ...
> {noformat}
> Relevant part of the spec:
> http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
> bq. If the sender's drain flag is set and there are no available messages,
> the sender MUST advance its delivery-count until link-credit is zero, and
> send its updated flow state to the receiver.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]