[
https://issues.apache.org/jira/browse/AMQ-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179862#comment-14179862
]
Robbie Gemmell commented on AMQ-5379:
-------------------------------------
Given his familiarity with the broker as a whole, [~tabish121] is possibly in a
better position to comment overall on how the changes will work with the broker
generally. That said, in terms of the AMQP related code change:
The new 'producerCredit' field in AmqpProtocolConverter is only being used
during the initial opening of the producer (i.e proton receiver object). The
existing 'prefetch' field value is still being used to replenish the producers
credit when it gets used up later by incoming messages, which may mean the
wrong amount of credit will be granted if the config values differ, so that
process also needs update to use the producerCredit field.
The 'client preference' check here should probably be removed. The producer
doesn't really get a say in the credit they are given from the server/receiver,
any link credit value they set in a Flow frame is a reflection of the last
value they know based on Flows sent to them from the server.
AmqpProtocolConverter, Line 824:
{noformat}
// Client is producing to this receiver object
org.apache.qpid.proton.amqp.transport.Target remoteTarget =
receiver.getRemoteTarget();
int flow = producerCredit;
// use client's preference if set
if (receiver.getRemoteCredit() != 0) {
flow = receiver.getRemoteCredit();
}
{noformat}
Slight aside: Flow frames dont need to be sent after processing every message.
Some clients may do that currently but when attention turns to efficiency and
performance, adjusting the credit in batches and only sending the Flows more
sparingly is what most will lkely do.
I think the updated consumer prefetch handling should work in a more AMQP-like
fashion in more cases, though I'm not sure if it yet fully handles the desire
of a consumer to completely stop message delivery by moving the link credit
back down to 0 (e.g using the flow drain flag, or by simply not replenishing
credit as it is used for earlier consumed messages), and it may lead to some
weirdness around configuring the ActiveMQ consumer prefetch repeatedly to a
different values which are lower than the currently outstanding number of
already prefetched messages. The main thing I wonder around that is, how would
the broker as a whole react to reducing the ActiveMQ consumer prefetch setting
below the number of messages that are already prefetched?
(also, the null on the context is redundant since instanceof check handles that)
{noformat}
+ protected void processLinkFlow(Link link) throws Exception {
+ Object context = link.getContext();
+ int credit = link.getRemoteCredit();
+ if (context != null && context instanceof ConsumerContext) {
+ ConsumerContext consumerContext = (ConsumerContext)context;
+ // change ActiveMQ consumer prefetch if needed
+ if (consumerContext.credit == 0 &&
consumerContext.consumerPrefetch != credit && credit > 0) {
+ ConsumerControl control = new ConsumerControl();
+ control.setConsumerId(consumerContext.consumerId);
+ control.setDestination(consumerContext.destination);
+ control.setPrefetch(credit);
+ consumerContext.consumerPrefetch = credit;
+ sendToActiveMQ(control, null);
+ }
+ consumerContext.credit = credit;
+ }
+ ((AmqpDeliveryListener) link.getContext()).drainCheck();
+ }
{noformat}
Perhaps confusingly, the link Flow event is raised by Proton for reasons other
than simply receiving a Flow frame for the link, and incoming flow frames may
not actually change the credit (but could effectively add or remote some), so
this method may execute for a sending link even without a credit change. Using
link.getRemoteCredit() doesnt return the amount of credit previously given by
the remote receiver, but rather calculates the currently unused amount by doing
'<remaining unused credit> - <messages still queued to send by procesing the
transport>' (which I'm not certain of, but seems like it could even be negative
if more messages have already attempted to be sent than there is now actually
remaining credit for, which I believe makes proton buffer them). As a result,
if all of the originally existing link credit had been used by prefetching at
this point, only some of that amount of credit has been replenished so far by
the remote receiver, link.getRemoteCredit() will presumably yeild a smaller
number than the original credit window was, which could make it set the
ActiveMQ consumer prefetch to this lower-than-previous value if
consumerContext.credit has previously been recorded as 0 while the credit was
all used. At this point <up to original credit window> messages may already be
prefetched and still awaiting actual consumption, though the ActiveMQ prefetch
may now set to a smaller value, only the as-yet-unused remaining AMQP credit.
How would the broker react to that? Further, if that credit is then also used,
and 'remote credit' hits 0 again, and the client subsequently Flows new credit
as it consumes earlier messages, that could lead to 'remote credit' changing
again to a different non-zero value than previously, possibly being used to
update the ActiveMQ consumer prefetch again to match. This is to say, it could
lead to the ActiveMQ consumer prefetch continuously being configured to new
values whenever the available AMQP credit changes after previously being all
used for a time.
In terms of testing, later releases of the 'old' Qpid AMQP 1.0 JMS client do
let you configure the consumer prefetch, though the way of configuring SSL
seems to have changed so other tests might need updated to update to using such
a release. It would probably even be useful to test the prefetch related
behaviour of the broker at a lower level, perhaps suing the proton engine
directly, in order to get more fine grained control over the protocol
interactions, though that is probably a task for later :)
> AMQP - allow setting prefetch size
> ----------------------------------
>
> Key: AMQ-5379
> URL: https://issues.apache.org/jira/browse/AMQ-5379
> Project: ActiveMQ
> Issue Type: Bug
> Components: AMQP
> Affects Versions: 5.10.0
> Reporter: Dejan Bosanac
> Assignee: Dejan Bosanac
> Fix For: 5.11.0
>
>
> Currently the prefetch size is hardcoded to the value of 100
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)