poorbarcode commented on code in PR #19585:
URL: https://github.com/apache/pulsar/pull/19585#discussion_r1122566942


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/GrowableArrayBlockingQueue.java:
##########
@@ -401,6 +413,24 @@ public String toString() {
         return sb.toString();
     }
 
+    /**
+     * Make the queue not accept new items. if there are still new data trying 
to enter the queue, it will be handed
+     * by {@param itemAfterTerminatedHandler}.
+     */
+    public void terminate(@Nullable Consumer<T> itemAfterTerminatedHandler) {
+        terminated = true;
+        if (itemAfterTerminatedHandler != null) {
+            this.itemAfterTerminatedHandler = itemAfterTerminatedHandler;
+        }
+        // After wait for the in-flight item enqueue, it means the operation 
of terminate is finished.
+        long stamp = tailLock.writeLock();
+        tailLock.unlockWrite(stamp);

Review Comment:
   yes, already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to