Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787783 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -122,7 +136,53 @@ public Object getBrokerConsumer() { @Override public void onFlow(int currentCredits, boolean drain) { - sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); + connection.requireInHandler(); + + setupCredit(); + + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) brokerConsumer; + if (drain) { + // If the draining is already running, then don't do anything + if (draining.compareAndSet(false, true)) { + final ProtonServerSenderContext plugSender = (ProtonServerSenderContext) serverConsumer.getProtocolContext(); + serverConsumer.forceDelivery(1, new Runnable() { + @Override + public void run() { + try { + connection.runNow(() -> { + plugSender.reportDrained(); + setupCredit(); + }); + } finally { + draining.set(false); + } + } + }); + } + } else { + serverConsumer.receiveCredits(-1); + } + } + + public boolean hasCredits() { + if (!connection.flowControl(onflowControlReady)) { + return false; + } + + //return true; + //return getSender().getCredit() > 0; --- End diff -- Remove commented out code
---