kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1358990797
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ########## @@ -66,31 +82,107 @@ boolean isEmpty() { * @return {@code true} if there are completed fetches that match the {@link Predicate}, {@code false} otherwise */ boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) { - return completedFetches.stream().anyMatch(predicate); + try { + lock.lock(); + return completedFetches.stream().anyMatch(predicate); + } finally { + lock.unlock(); + } } void add(CompletedFetch completedFetch) { - completedFetches.add(completedFetch); + try { + lock.lock(); + completedFetches.add(completedFetch); + notEmptyCondition.signalAll(); + } finally { + lock.unlock(); + } } void addAll(Collection<CompletedFetch> completedFetches) { - this.completedFetches.addAll(completedFetches); + if (completedFetches == null || completedFetches.isEmpty()) + return; + + try { + lock.lock(); + this.completedFetches.addAll(completedFetches); + notEmptyCondition.signalAll(); + } finally { + lock.unlock(); + } } CompletedFetch nextInLineFetch() { - return nextInLineFetch; + try { + lock.lock(); + return nextInLineFetch; + } finally { + lock.unlock(); + } } - void setNextInLineFetch(CompletedFetch completedFetch) { - this.nextInLineFetch = completedFetch; + void setNextInLineFetch(CompletedFetch nextInLineFetch) { + try { + lock.lock(); + this.nextInLineFetch = nextInLineFetch; + } finally { + lock.unlock(); + } } CompletedFetch peek() { - return completedFetches.peek(); + try { + lock.lock(); + return completedFetches.peek(); + } finally { + lock.unlock(); + } } CompletedFetch poll() { - return completedFetches.poll(); + try { + lock.lock(); + return completedFetches.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Allows the caller to await presence of data in the buffer. The method will block, returning only + * under one of the following conditions: + * + * <ol> + * <li>The buffer was already non-empty on entry</li> + * <li>The buffer was populated during the wait</li> + * <li>The remaining time on the {@link Timer timer} elapsed</li> + * <li>The thread was interrupted</li> + * </ol> + * + * @param timer Timer that provides time to wait + */ + void awaitNotEmpty(Timer timer) { + try { + lock.lock(); + + while (isEmpty()) { + // Update the timer before we head into the loop in case it took a while to get the lock. + timer.update(); + + if (timer.isExpired()) + break; + + if (notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) { Review Comment: Thanks for catching that. I fixed this but it's pretty clear now that I need unit tests to validate correctness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org