[ 
https://issues.apache.org/jira/browse/AMQ-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179862#comment-14179862
 ] 

Robbie Gemmell commented on AMQ-5379:
-------------------------------------

Given his familiarity with the broker as a whole, [~tabish121] is possibly in a 
better position to comment overall on how the changes will work with the broker 
generally. That said, in terms of the AMQP related code change:

The new 'producerCredit' field in AmqpProtocolConverter is only being used 
during the initial opening of the producer (i.e proton receiver object). The 
existing 'prefetch' field value is still being used to replenish the producers 
credit when it gets used up later by incoming messages, which may mean the 
wrong amount of credit will be granted if the config values differ, so that 
process also needs update to use the producerCredit field.

The 'client preference' check here should probably be removed. The producer 
doesn't really get a say in the credit they are given from the server/receiver, 
any link credit value they set in a Flow frame is a reflection of the last 
value they know based on Flows sent to them from the server. 
AmqpProtocolConverter, Line 824:
{noformat}
        // Client is producing to this receiver object
        org.apache.qpid.proton.amqp.transport.Target remoteTarget = 
receiver.getRemoteTarget();
        int flow = producerCredit;
        // use client's preference if set
        if (receiver.getRemoteCredit() != 0) {
            flow = receiver.getRemoteCredit();
        }
{noformat}


Slight aside: Flow frames dont need to be sent after processing every message. 
Some clients may do that currently but when attention turns to efficiency and 
performance, adjusting the credit in batches and only sending the Flows more 
sparingly is what most will lkely do.

I think the updated consumer prefetch handling should work in a more AMQP-like 
fashion in more cases, though I'm not sure if it yet fully handles the desire 
of a consumer to completely stop message delivery by moving the link credit 
back down to 0 (e.g using the flow drain flag, or by simply not replenishing 
credit as it is used for earlier consumed messages), and it may lead to some 
weirdness around configuring the ActiveMQ consumer prefetch repeatedly to a 
different values which are lower than the currently outstanding number of 
already prefetched messages. The main thing I wonder around that is, how would 
the broker as a whole react to reducing the ActiveMQ consumer prefetch setting 
below the number of messages that are already prefetched?

(also, the null on the context is redundant since instanceof check handles that)
{noformat}
+    protected void processLinkFlow(Link link) throws Exception {
+        Object context = link.getContext();
+        int credit = link.getRemoteCredit();
+        if (context != null && context instanceof ConsumerContext) {
+            ConsumerContext consumerContext = (ConsumerContext)context;
+            // change ActiveMQ consumer prefetch if needed
+            if (consumerContext.credit == 0 && 
consumerContext.consumerPrefetch != credit && credit > 0) {
+                ConsumerControl control = new ConsumerControl();
+                control.setConsumerId(consumerContext.consumerId);
+                control.setDestination(consumerContext.destination);
+                control.setPrefetch(credit);
+                consumerContext.consumerPrefetch = credit;
+                sendToActiveMQ(control, null);
+            }
+            consumerContext.credit = credit;
+        }
+        ((AmqpDeliveryListener) link.getContext()).drainCheck();
+    }
{noformat}

Perhaps confusingly, the link Flow event is raised by Proton for reasons other 
than simply receiving a Flow frame for the link, and incoming flow frames may 
not actually change the credit (but could effectively add or remote some), so 
this method may execute for a sending link even without a credit change. Using 
link.getRemoteCredit() doesnt return the amount of credit previously given by 
the remote receiver, but rather calculates the currently unused amount by doing 
'<remaining unused credit> - <messages still queued to send by procesing the 
transport>' (which I'm not certain of, but seems like it could even be negative 
if more messages have already attempted to be sent than there is now actually 
remaining credit for, which I believe makes proton buffer them). As a result, 
if all of the originally existing link credit had been used by prefetching at 
this point, only some of that amount of credit has been replenished so far by 
the remote receiver, link.getRemoteCredit() will presumably yeild a smaller 
number than the original credit window was, which could make it set the 
ActiveMQ consumer prefetch to this lower-than-previous value if 
consumerContext.credit has previously been recorded as 0 while the credit was 
all used. At this point <up to original credit window> messages may already be 
prefetched and still awaiting actual consumption, though the ActiveMQ prefetch 
may now set to a smaller value, only the as-yet-unused remaining AMQP credit. 
How would the broker react to that? Further, if that credit is then also used, 
and 'remote credit' hits 0 again, and the client subsequently Flows new credit 
as it consumes earlier messages, that could lead to 'remote credit' changing 
again to a different non-zero value than previously, possibly being used to 
update the ActiveMQ consumer prefetch again to match. This is to say, it could 
lead to the ActiveMQ consumer prefetch continuously being configured to new 
values whenever the available AMQP credit changes after previously being all 
used for a time.

In terms of testing, later releases of the 'old' Qpid AMQP 1.0 JMS client do 
let you configure the consumer prefetch, though the way of configuring SSL 
seems to have changed so other tests might need updated to update to using such 
a release. It would probably even be useful to test the prefetch related 
behaviour of the broker at a lower level, perhaps suing the proton engine 
directly, in order to get more fine grained control over the protocol 
interactions, though that is probably a task for later :)

> AMQP - allow setting prefetch size
> ----------------------------------
>
>                 Key: AMQ-5379
>                 URL: https://issues.apache.org/jira/browse/AMQ-5379
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: AMQP
>    Affects Versions: 5.10.0
>            Reporter: Dejan Bosanac
>            Assignee: Dejan Bosanac
>             Fix For: 5.11.0
>
>
> Currently the prefetch size is hardcoded to the value of 100



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to