This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new bab9398 KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state`
(#10879)
bab9398 is described below
commit bab9398a436d06b86ff45f5063653f2f27a8ea3e
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 24 17:40:40 2021 +0200
KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
This patch fixes the unsynchronized accesses to `AbstractCoordinator.state`.
(cherry-picked from commit 574af88305273f21456a9b10f21c182181cfc600)
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 16 +++++-----
.../consumer/internals/ConsumerCoordinator.java | 34 +++++++++++++---------
2 files changed, 30 insertions(+), 20 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 2f0d6c8..b70b67c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1119,13 +1119,15 @@ public abstract class AbstractCoordinator implements
Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// since we may be sending the request during rebalance, we
should check
// this case and ignore the REBALANCE_IN_PROGRESS error
- if (state == MemberState.STABLE) {
- log.info("Attempt to heartbeat failed since group is
rebalancing");
- requestRejoin();
- future.raise(error);
- } else {
- log.debug("Ignoring heartbeat response with error {}
during {} state", error, state);
- future.complete(null);
+ synchronized (AbstractCoordinator.this) {
+ if (state == MemberState.STABLE) {
+ log.info("Attempt to heartbeat failed since group is
rebalancing");
+ requestRejoin();
+ future.raise(error);
+ } else {
+ log.debug("Ignoring heartbeat response with error {}
during {} state", error, state);
+ future.complete(null);
+ }
}
} else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b415b22..2dae166 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1218,13 +1218,17 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
if (generationUnchanged()) {
future.raise(error);
} else {
- if (ConsumerCoordinator.this.state ==
MemberState.PREPARING_REBALANCE) {
- future.raise(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's old generation is
fenced by its group instance id, it is possible that " +
- "this consumer has already
participated another rebalance and got a new generation"));
- } else {
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (ConsumerCoordinator.this.state ==
MemberState.PREPARING_REBALANCE) {
+ exception = new
RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's old generation
is fenced by its group instance id, it is possible that " +
+ "this consumer has already
participated another rebalance and got a new generation");
+ } else {
+ exception = new
CommitFailedException();
+ }
}
+ future.raise(exception);
}
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1247,14 +1251,18 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// only need to reset generation and re-join group
if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
- if (!generationUnchanged() &&
ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
- future.raise(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's generation is already
stale, meaning it has already participated another rebalance and " +
- "got a new generation. You can try
completing the rebalance by calling poll() and then retry commit again"));
- } else {
-
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (!generationUnchanged() &&
ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+ exception = new
RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's generation is
already stale, meaning it has already participated another rebalance and " +
+ "got a new generation. You can try
completing the rebalance by calling poll() and then retry commit again");
+ } else {
+
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+ exception = new CommitFailedException();
+ }
}
+ future.raise(exception);
return;
} else {
future.raise(new KafkaException("Unexpected error
in commit: " + error.message()));