Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787783
  
    --- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
    @@ -122,7 +136,53 @@ public Object getBrokerConsumer() {
     
        @Override
        public void onFlow(int currentCredits, boolean drain) {
    -      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
    +      connection.requireInHandler();
    +
    +      setupCredit();
    +
    +      ServerConsumerImpl serverConsumer = (ServerConsumerImpl) 
brokerConsumer;
    +      if (drain) {
    +         // If the draining is already running, then don't do anything
    +         if (draining.compareAndSet(false, true)) {
    +            final ProtonServerSenderContext plugSender = 
(ProtonServerSenderContext) serverConsumer.getProtocolContext();
    +            serverConsumer.forceDelivery(1, new Runnable() {
    +               @Override
    +               public void run() {
    +                  try {
    +                     connection.runNow(() -> {
    +                        plugSender.reportDrained();
    +                        setupCredit();
    +                     });
    +                  } finally {
    +                     draining.set(false);
    +                  }
    +               }
    +            });
    +         }
    +      } else {
    +         serverConsumer.receiveCredits(-1);
    +      }
    +   }
    +
    +   public boolean hasCredits() {
    +      if (!connection.flowControl(onflowControlReady)) {
    +         return false;
    +      }
    +
    +      //return true;
    +      //return getSender().getCredit() > 0;
    --- End diff --
    
    Remove commented out code


---

Reply via email to