This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 1f18753a1f0 KAFKA-18034: CommitRequestManager should fail pending
requests on fatal coordinator errors (#18754)
1f18753a1f0 is described below
commit 1f18753a1f0706d172b73d46eefeadbd5ddd36be
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jan 30 16:55:51 2025 -0500
KAFKA-18034: CommitRequestManager should fail pending requests on fatal
coordinator errors (#18754)
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 6 +++
.../consumer/internals/CommitRequestManager.java | 16 +++++++-
.../internals/CoordinatorRequestManager.java | 26 +++++++++----
.../consumer/internals/RequestManagers.java | 2 -
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../internals/CommitRequestManagerTest.java | 18 +++++++++
.../internals/CoordinatorRequestManagerTest.java | 45 ++++++++--------------
.../kafka/api/AuthorizerIntegrationTest.scala | 6 +--
.../kafka/server/QuorumTestHarness.scala | 4 --
9 files changed, 76 insertions(+), 49 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index b260e5e5fbf..fe8926abbf8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -163,6 +163,7 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() ||
membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped();
+ maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
@@ -265,6 +266,11 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
pollTimer.reset(maxPollIntervalMs);
}
+ private void maybePropagateCoordinatorFatalErrorEvent() {
+ coordinatorRequestManager.getAndClearFatalError()
+ .ifPresent(fatalError -> backgroundEventHandler.add(new
ErrorEvent(fatalError)));
+ }
+
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 4f0deef5bf8..1d3503886a9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -176,9 +176,11 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- // poll only when the coordinator node is known.
- if (coordinatorRequestManager.coordinator().isEmpty())
+ // poll when the coordinator node is known and fatal error is not
present
+ if (coordinatorRequestManager.coordinator().isEmpty()) {
+ pendingRequests.maybeFailOnCoordinatorFatalError();
return EMPTY;
+ }
if (closing) {
return drainPendingOffsetCommitRequests();
@@ -1246,6 +1248,16 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
clearAll();
return res;
}
+
+ private void maybeFailOnCoordinatorFatalError() {
+ coordinatorRequestManager.fatalError().ifPresent(error -> {
+ log.warn("Failing all unsent commit requests and offset
fetches because of coordinator fatal error. ", error);
+ unsentOffsetCommits.forEach(request ->
request.future.completeExceptionally(error));
+ unsentOffsetFetches.forEach(request ->
request.future.completeExceptionally(error));
+ clearAll();
+ }
+ );
+ }
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 0f1650d0e67..dd53ae11790 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
@@ -53,7 +51,6 @@ import static
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60
* 1000;
private final Logger log;
- private final BackgroundEventHandler backgroundEventHandler;
private final String groupId;
private final RequestState coordinatorRequestState;
@@ -61,17 +58,21 @@ public class CoordinatorRequestManager implements
RequestManager {
private long totalDisconnectedMin = 0;
private boolean closing = false;
private Node coordinator;
+ // Hold the latest fatal error received. It is exposed so that managers
requiring a coordinator can access it and take
+ // appropriate actions.
+ // For example:
+ // - AbstractHeartbeatRequestManager propagates the error event to the
application thread.
+ // - CommitRequestManager fail pending requests.
+ private Optional<Throwable> fatalError = Optional.empty();
public CoordinatorRequestManager(
final LogContext logContext,
final long retryBackoffMs,
final long retryBackoffMaxMs,
- final BackgroundEventHandler errorHandler,
final String groupId
) {
Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass());
- this.backgroundEventHandler = errorHandler;
this.groupId = groupId;
this.coordinatorRequestState = new RequestState(
logContext,
@@ -120,6 +121,7 @@ public class CoordinatorRequestManager implements
RequestManager {
);
return unsentRequest.whenComplete((clientResponse, throwable) -> {
+ getAndClearFatalError();
if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse)
clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response);
@@ -206,12 +208,12 @@ public class CoordinatorRequestManager implements
RequestManager {
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
- backgroundEventHandler.add(new
ErrorEvent(groupAuthorizationException));
+ fatalError = Optional.of(groupAuthorizationException);
return;
}
log.warn("FindCoordinator request failed due to fatal exception",
exception);
- backgroundEventHandler.add(new ErrorEvent(exception));
+ fatalError = Optional.of(exception);
}
/**
@@ -250,4 +252,14 @@ public class CoordinatorRequestManager implements
RequestManager {
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}
+
+ public Optional<Throwable> getAndClearFatalError() {
+ Optional<Throwable> fatalError = this.fatalError;
+ this.fatalError = Optional.empty();
+ return fatalError;
+ }
+
+ public Optional<Throwable> fatalError() {
+ return fatalError;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 304f0fffd4a..960567ac96b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -194,7 +194,6 @@ public class RequestManagers implements Closeable {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
- backgroundEventHandler,
groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager(
time,
@@ -295,7 +294,6 @@ public class RequestManagers implements Closeable {
logContext,
retryBackoffMs,
retryBackoffMaxMs,
- backgroundEventHandler,
groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new
ShareMembershipManager(
logContext,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 33ca2844305..2749563df27 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2239,7 +2239,7 @@ public class KafkaConsumerTest {
// by the background thread, so it can realize there is
authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
- "he consumer was not able to discover metadata errors
during continuous polling.");
+ "this consumer was not able to discover metadata errors
during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () ->
consumer.poll(Duration.ZERO));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 8e98c14b598..32b8aea5b22 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@@ -1482,6 +1483,23 @@ public class CommitRequestManagerTest {
assertEquals("topic", data.topics().get(0).name());
}
+ @Test
+ public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+ commitRequestManager.fetchOffsets(Collections.singleton(new
TopicPartition("test", 0)), 200);
+ assertEquals(1,
commitRequestManager.pendingRequests.unsentOffsetFetches.size());
+
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ when(coordinatorRequestManager.fatalError())
+ .thenReturn(Optional.of(new GroupAuthorizationException("Group
authorization exception")));
+
+ assertEquals(NetworkClientDelegate.PollResult.EMPTY,
commitRequestManager.poll(200));
+
+ assertEmptyPendingRequests(commitRequestManager);
+ }
+
private static void assertEmptyPendingRequests(CommitRequestManager
commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 2165cb814ed..0ed902d7f27 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -18,9 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -35,6 +33,8 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections;
import java.util.List;
@@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
public class CoordinatorRequestManagerTest {
@@ -191,23 +189,10 @@ public class CoordinatorRequestManagerTest {
}
@Test
- public void testPropagateAndBackoffAfterFatalError() {
+ public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager =
setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager,
Errors.GROUP_AUTHORIZATION_FAILED);
- verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
- if (!(backgroundEvent instanceof ErrorEvent))
- return false;
-
- RuntimeException exception = ((ErrorEvent)
backgroundEvent).error();
-
- if (!(exception instanceof GroupAuthorizationException))
- return false;
-
- GroupAuthorizationException groupAuthException =
(GroupAuthorizationException) exception;
- return groupAuthException.groupId().equals(GROUP_ID);
- }));
-
time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests);
@@ -254,19 +239,20 @@ public class CoordinatorRequestManagerTest {
assertEquals(1, res2.unsentRequests.size());
}
- @Test
- public void testSignalOnClose() {
+ @ParameterizedTest
+ @EnumSource(value = Errors.class, names = {"NONE",
"COORDINATOR_NOT_AVAILABLE"})
+ public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors
error) {
CoordinatorRequestManager coordinatorManager =
setupCoordinatorManager(GROUP_ID);
- expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
- assertTrue(coordinatorManager.coordinator().isPresent());
- coordinatorManager.markCoordinatorUnknown("coordinator changed",
time.milliseconds());
- assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests);
- coordinatorManager.signalClose();
- time.sleep(RETRY_BACKOFF_MS - 1);
- assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests);
+ expectFindCoordinatorRequest(coordinatorManager,
Errors.GROUP_AUTHORIZATION_FAILED);
+ assertTrue(coordinatorManager.fatalError().isPresent());
+
time.sleep(RETRY_BACKOFF_MS);
- assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests,
- "Should not generate find coordinator request during close");
+ // there are no successful responses, so the fatal error should persist
+ assertTrue(coordinatorManager.fatalError().isPresent());
+
+ // receiving a successful response should clear the fatal error
+ expectFindCoordinatorRequest(coordinatorManager, error);
+ assertTrue(coordinatorManager.fatalError().isEmpty());
}
private void expectFindCoordinatorRequest(
@@ -288,7 +274,6 @@ public class CoordinatorRequestManagerTest {
new LogContext(),
RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS,
- this.backgroundEventHandler,
groupId
);
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4256d40da4a..f8e3e1fc54a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1294,7 +1294,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () =>
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
@@ -1331,7 +1331,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit
= {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer()
@@ -1357,7 +1357,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String):
Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString,
WILDCARD_HOST, READ, ALLOW)), topicResource)
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 72b76cd152d..dac38f2de26 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -439,8 +439,4 @@ object QuorumTestHarness {
// The following is for tests that only work with the classic group protocol
because of relying on Zookeeper
def
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit:
java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk",
GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
-
- // The following parameter groups are to *temporarily* avoid bugs with the
CONSUMER group protocol Consumer
- // implementation that would otherwise cause tests to fail.
- def
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034:
stream.Stream[Arguments] =
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
}