[ 
https://issues.apache.org/jira/browse/QPID-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006083#comment-16006083
 ] 

Rob Godfrey commented on QPID-7634:
-----------------------------------

I don't believe this change is correct.

The change seems to make the assumption that the flow(drain=true) should be 
treated like a synchronous request/response action… when it really should be an 
asynchronous/state modification thing, where the broker eventually tells the 
client when its state has changed.  This causes an issue where a drain is 
issued with a remaining link credit of X, and there are at least X messages 
available to be sent on the link, but the session does not have sufficient 
session credit to transfer all those messages.  In this case the broker should 
send as many transfers as it currently can and then wait until it receives more 
session credit.  Only if the queue becomes empty should the delivery count 
actually be advanced.

An interesting question is whether the "issue" identified is actually an issue 
at all.  The client issues a drain on a link with zero credit... which means 
that the delivery count is already advanced.  In this case I do not believe it 
should be necessary for the server to echo a drain back, however the wording of 
the spec does not seem to take this case into account... so perhaps we should 
simply treat this case as an "echo" request. 

> [Java Broker,amqp 1.0] Broker does not respond to Flow command with 
> drain=true if queue is empty and prefetch is 0
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: QPID-7634
>                 URL: https://issues.apache.org/jira/browse/QPID-7634
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>    Affects Versions: qpid-java-6.1, qpid-java-6.1.1, qpid-java-broker-7.0.0
>            Reporter: Alex Rudyy
>             Fix For: qpid-java-broker-7.0.0
>
>
> When AMQP 1.0 client sends the Flow command with drain=true and queue is 
> empty the Broker does not respond if prefetch is 0.
> The following snippet reproduces the issue
> {code}
> Connection connection = factory.createConnection(username, password);
> connection.start();
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> MessageConsumer messageConsumer = session.createConsumer(queue);
> TextMessage receivedMessage = (TextMessage) messageConsumer.receiveNoWait(); 
> // <-- times out
> {code}
> The broker logs
> {noformat}
> 2017-01-24 11:05:56,471 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> RECV[/127.0.0.1:51108|1] : 
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=receiver,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
>  amqp:rejected:list, amqp:released:list, 
> amqp:modified:list],capabilities=[queue, global]},target=Target{}}
> 2017-01-24 11:05:56,474 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['add consumer' on 'queue' 
> with arguments 
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
>  ], 
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  optionSet=[ACQUIRES, SEES_REQUEUES]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.m.AbstractConfiguredObject) - authorise returned DEFER, returing 
> default: ALLOWED
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Performing Task['open' on 
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]']
> 2017-01-24 11:05:56,475 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['open' on 
> 'Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]'] performed successfully with result: null
> 2017-01-24 11:05:56,476 INFO  [VirtualHostNode-default-Config] (q.m.s.create) 
> - [con:7(admin@/127.0.0.1:51108/default)/ch:1] [sub:7(vh(/default)/qu(queue)] 
> SUB-1001 : Create
> 2017-01-24 11:05:56,476 DEBUG [VirtualHostNode-default-Config] 
> (o.a.q.s.c.u.TaskExecutorImpl) - Task['add consumer' on 'queue' with 
> arguments 
> 'target=ConsumerTarget_1_0[linkSession=[con:7(admin@/127.0.0.1:51108/default)/ch:1]
>  ], 
> consumerName=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  optionSet=[ACQUIRES, SEES_REQUEUES]'] performed successfully with result: 
> Consumer[id=164b1b27-176a-42a3-a5dc-613d97bc58b2, 
> name=7|1|qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,
>  type=Consumer]
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> SEND[/127.0.0.1:51108|1] : 
> Attach{name=qpid-jms:receiver:ID:20aa7189-d399-472d-a8ce-aa8b7a3bef75:1:1:1:queue,handle=1,role=sender,sndSettleMode=unsettled,rcvSettleMode=first,source=Source{address=queue,durable=none,expiryPolicy=link-detach,timeout=0,dynamic=false,defaultOutcome=Modified{deliveryFailed=true},outcomes=[amqp:accepted:list,
>  amqp:rejected:list, amqp:released:list, 
> amqp:modified:list],capabilities=[queue]},target=Target{},initialDeliveryCount=0,offeredCapabilities=[SHARED-SUBS],properties={}}
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Written 251 bytes
> 2017-01-24 11:05:56,477 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 34 byte(s)
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] (o.a.q.s.p.frame) - 
> RECV[/127.0.0.1:51108|1] : 
> Flow{nextIncomingId=0,incomingWindow=2047,nextOutgoingId=1,outgoingWindow=2147483647,handle=1,deliveryCount=0,linkCredit=1,drain=true}
> 2017-01-24 11:05:56,487 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> 2017-01-24 11:05:58,162 DEBUG [IO-/127.0.0.1:51108] 
> (o.a.q.s.t.NonBlockingConnection) - Read 0 byte(s)
> ...
> {noformat}
> Relevant part of the spec:
> http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#doc-flow-control
> bq. If the sender's drain flag is set and there are no available messages, 
> the sender MUST advance its delivery-count until link-credit is zero, and 
> send its updated flow state to the receiver.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to