This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 958f90e KAFKA-12375: fix concurrency issue in application shutdown
(#10213)
958f90e is described below
commit 958f90e710f0de164dce1dda5f45d75a8b8fb8d4
Author: Walker Carlson <[email protected]>
AuthorDate: Fri Feb 26 12:17:28 2021 -0800
KAFKA-12375: fix concurrency issue in application shutdown (#10213)
Need to ensure that `enforceRebalance` is used in a thread safe way
Reviewers: Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman
<[email protected]>
---
.../apache/kafka/streams/processor/internals/StreamThread.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b967f5af..9fadd53 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -571,6 +571,11 @@ public class StreamThread extends Thread {
// until the rebalance is completed before we close and commit the
tasks
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
+ if (assignmentErrorCode.get() ==
AssignorError.SHUTDOWN_REQUESTED.code()) {
+ log.warn("Detected that shutdown was requested. " +
+ "All clients in this app will now begin to
shutdown");
+ mainConsumer.enforceRebalance();
+ }
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for
{} ms.", nextProbingRebalanceMs.get());
@@ -660,10 +665,7 @@ public class StreamThread extends Thread {
}
public void sendShutdownRequest(final AssignorError assignorError) {
- log.warn("Detected that shutdown was requested. " +
- "All clients in this app will now begin to shutdown");
assignmentErrorCode.set(assignorError.code());
- mainConsumer.enforceRebalance();
}
private void handleTaskMigrated(final TaskMigratedException e) {