This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 574af88 KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state`
(#10879)
574af88 is described below
commit 574af88305273f21456a9b10f21c182181cfc600
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 24 17:40:40 2021 +0200
KAFKA-12991; Fix unsafe access to `AbstractCoordinator.state` (#10879)
This patch fixes the unsynchronized accesses to `AbstractCoordinator.state`.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 14 +++++----
.../consumer/internals/ConsumerCoordinator.java | 34 +++++++++++++---------
2 files changed, 29 insertions(+), 19 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cd1daa8..9fbfe1f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -1119,12 +1119,14 @@ public abstract class AbstractCoordinator implements
Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// since we may be sending the request during rebalance, we
should check
// this case and ignore the REBALANCE_IN_PROGRESS error
- if (state == MemberState.STABLE) {
- requestRejoin("group is already rebalancing");
- future.raise(error);
- } else {
- log.debug("Ignoring heartbeat response with error {}
during {} state", error, state);
- future.complete(null);
+ synchronized (AbstractCoordinator.this) {
+ if (state == MemberState.STABLE) {
+ requestRejoin("group is already rebalancing");
+ future.raise(error);
+ } else {
+ log.debug("Ignoring heartbeat response with error {}
during {} state", error, state);
+ future.complete(null);
+ }
}
} else if (error == Errors.ILLEGAL_GENERATION ||
error == Errors.UNKNOWN_MEMBER_ID ||
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 226297a..39f4520 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1216,13 +1216,17 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
if (generationUnchanged()) {
future.raise(error);
} else {
- if (ConsumerCoordinator.this.state ==
MemberState.PREPARING_REBALANCE) {
- future.raise(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's old generation is
fenced by its group instance id, it is possible that " +
- "this consumer has already
participated another rebalance and got a new generation"));
- } else {
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (ConsumerCoordinator.this.state ==
MemberState.PREPARING_REBALANCE) {
+ exception = new
RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's old generation
is fenced by its group instance id, it is possible that " +
+ "this consumer has already
participated another rebalance and got a new generation");
+ } else {
+ exception = new
CommitFailedException();
+ }
}
+ future.raise(exception);
}
return;
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
@@ -1245,14 +1249,18 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// only need to reset generation and re-join group
if generation has not changed or we are not in rebalancing;
// otherwise only raise rebalance-in-progress error
- if (!generationUnchanged() &&
ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
- future.raise(new
RebalanceInProgressException("Offset commit cannot be completed since the " +
- "consumer member's generation is already
stale, meaning it has already participated another rebalance and " +
- "got a new generation. You can try
completing the rebalance by calling poll() and then retry commit again"));
- } else {
-
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
- future.raise(new CommitFailedException());
+ KafkaException exception;
+ synchronized (ConsumerCoordinator.this) {
+ if (!generationUnchanged() &&
ConsumerCoordinator.this.state == MemberState.PREPARING_REBALANCE) {
+ exception = new
RebalanceInProgressException("Offset commit cannot be completed since the " +
+ "consumer member's generation is
already stale, meaning it has already participated another rebalance and " +
+ "got a new generation. You can try
completing the rebalance by calling poll() and then retry commit again");
+ } else {
+
resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error);
+ exception = new CommitFailedException();
+ }
}
+ future.raise(exception);
return;
} else {
future.raise(new KafkaException("Unexpected error
in commit: " + error.message()));