gtully commented on code in PR #4509:
URL: https://github.com/apache/activemq-artemis/pull/4509#discussion_r1232055299
##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java:
##########
@@ -155,6 +167,67 @@ private synchronized void connect() throws Exception {
}
}
+ interface QueueHandle {
+ long getMessageCount();
+ int getCreditWindow();
+ }
+
+ private QueueHandle createQueueHandle(ActiveMQServer server,
ClientSession.QueueQuery queryResult) {
+ final Queue queue = server.locateQueue(queryResult.getName());
+ int creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+
+ final Integer defaultConsumerWindowSize =
queryResult.getDefaultConsumerWindowSize();
+ if (defaultConsumerWindowSize != null) {
+ creditWindow = defaultConsumerWindowSize.intValue();
+ if (creditWindow <= 0) {
+ creditWindow = DEFAULT_CONSUMER_WINDOW_SIZE;
+ logger.trace("{} override non positive queue consumerWindowSize
with {}.", this, creditWindow);
+ }
+ }
+
+ final int finalCreditWindow = creditWindow;
+ return new QueueHandle() {
+ @Override
+ public long getMessageCount() {
+ return queue.getMessageCountForRing();
+ }
+
+ @Override
+ public int getCreditWindow() {
+ return finalCreditWindow;
+ }
+ };
+ }
+
+ private void scheduleCreditOnEmpty(final int delay, final QueueHandle
handle) {
+ if (handle != null) {
Review Comment:
agree, that check was just defensive but unnecessary, the consumer can go
stale with a pending check but that is already covered and any failure will
result in the session/consumer getting recreated, so all that in necessary is
that we don't reschedule in that case.
thanks for the feedback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]