This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 46b825c9e65 KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050) 46b825c9e65 is described below commit 46b825c9e651ee3e901e97d43fa7bf2ba10ff749 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Mon Jan 13 22:29:14 2025 +0800 KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050) Reviewers: Kirk True <kt...@confluent.io>, Lianet Magrans <lmagr...@confluent.io> --- .../internals/AbstractHeartbeatRequestManager.java | 6 ++++ .../consumer/internals/CommitRequestManager.java | 16 +++++++-- .../internals/CoordinatorRequestManager.java | 24 ++++++++++---- .../consumer/internals/RequestManagers.java | 2 -- .../kafka/clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/CommitRequestManagerTest.java | 18 ++++++++++ .../internals/CoordinatorRequestManagerTest.java | 38 +++++++++++----------- .../kafka/api/AuthorizerIntegrationTest.scala | 6 ++-- .../kafka/server/QuorumTestHarness.scala | 4 --- 9 files changed, 78 insertions(+), 38 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index b260e5e5fbf..f0bd4f43fca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -163,6 +163,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) { membershipManager().onHeartbeatRequestSkipped(); + maybePropagateCoordinatorFatalErrorEvent(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); @@ -265,6 +266,11 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse pollTimer.reset(maxPollIntervalMs); } + private void maybePropagateCoordinatorFatalErrorEvent() { + coordinatorRequestManager.fatalError() + .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); + } + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 4f0deef5bf8..1d3503886a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -176,9 +176,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - // poll only when the coordinator node is known. - if (coordinatorRequestManager.coordinator().isEmpty()) + // poll when the coordinator node is known and fatal error is not present + if (coordinatorRequestManager.coordinator().isEmpty()) { + pendingRequests.maybeFailOnCoordinatorFatalError(); return EMPTY; + } if (closing) { return drainPendingOffsetCommitRequests(); @@ -1246,6 +1248,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener clearAll(); return res; } + + private void maybeFailOnCoordinatorFatalError() { + coordinatorRequestManager.fatalError().ifPresent(error -> { + log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); + unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); + unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); + clearAll(); + } + ); + } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 4664267a0e8..07c4fad45b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; @@ -53,24 +51,27 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate. public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; - private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; private Node coordinator; + // Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take + // appropriate actions. + // For example: + // - AbstractHeartbeatRequestManager propagates the error event to the application thread. + // - CommitRequestManager fail pending requests. + private Optional<Throwable> fatalError = Optional.empty(); public CoordinatorRequestManager( final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, - final BackgroundEventHandler errorHandler, final String groupId ) { Objects.requireNonNull(groupId); this.log = logContext.logger(this.getClass()); - this.backgroundEventHandler = errorHandler; this.groupId = groupId; this.coordinatorRequestState = new RequestState( logContext, @@ -114,6 +115,7 @@ public class CoordinatorRequestManager implements RequestManager { ); return unsentRequest.whenComplete((clientResponse, throwable) -> { + clearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -200,12 +202,12 @@ public class CoordinatorRequestManager implements RequestManager { if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); + fatalError = Optional.of(groupAuthorizationException); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - backgroundEventHandler.add(new ErrorEvent(exception)); + fatalError = Optional.of(exception); } /** @@ -244,4 +246,12 @@ public class CoordinatorRequestManager implements RequestManager { public Optional<Node> coordinator() { return Optional.ofNullable(this.coordinator); } + + private void clearFatalError() { + this.fatalError = Optional.empty(); + } + + public Optional<Throwable> fatalError() { + return fatalError; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 304f0fffd4a..960567ac96b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -194,7 +194,6 @@ public class RequestManagers implements Closeable { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); commitRequestManager = new CommitRequestManager( time, @@ -295,7 +294,6 @@ public class RequestManagers implements Closeable { logContext, retryBackoffMs, retryBackoffMaxMs, - backgroundEventHandler, groupRebalanceConfig.groupId); ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 33ca2844305..2749563df27 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2239,7 +2239,7 @@ public class KafkaConsumerTest { // by the background thread, so it can realize there is authentication fail and then // throw the AuthenticationException assertPollEventuallyThrows(consumer, AuthenticationException.class, - "he consumer was not able to discover metadata errors during continuous polling."); + "this consumer was not able to discover metadata errors during continuous polling."); } else { assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 8e98c14b598..0408b156a6d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -1481,6 +1482,23 @@ public class CommitRequestManagerTest { OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); assertEquals("topic", data.topics().get(0).name()); } + + @Test + public void testPollWithFatalErrorShouldFailAllUnsentRequests() { + CommitRequestManager commitRequestManager = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200); + assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size()); + + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + when(coordinatorRequestManager.fatalError()) + .thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception"))); + + assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); + + assertEmptyPendingRequests(commitRequestManager); + } private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 7e805dc3cd3..72599100a36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -18,9 +18,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -35,6 +33,8 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.List; @@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; public class CoordinatorRequestManagerTest { @@ -191,23 +189,10 @@ public class CoordinatorRequestManagerTest { } @Test - public void testPropagateAndBackoffAfterFatalError() { + public void testBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - verify(backgroundEventHandler).add(argThat(backgroundEvent -> { - if (!(backgroundEvent instanceof ErrorEvent)) - return false; - - RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); - - if (!(exception instanceof GroupAuthorizationException)) - return false; - - GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; - return groupAuthException.groupId().equals(GROUP_ID); - })); - time.sleep(RETRY_BACKOFF_MS - 1); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); @@ -253,6 +238,22 @@ public class CoordinatorRequestManagerTest { res2 = coordinatorManager.poll(time.milliseconds()); assertEquals(1, res2.unsentRequests.size()); } + + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"}) + public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) { + CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); + expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); + assertTrue(coordinatorManager.fatalError().isPresent()); + + time.sleep(RETRY_BACKOFF_MS); + // there are no successful responses, so the fatal error should persist + assertTrue(coordinatorManager.fatalError().isPresent()); + + // receiving a successful response should clear the fatal error + expectFindCoordinatorRequest(coordinatorManager, error); + assertTrue(coordinatorManager.fatalError().isEmpty()); + } private void expectFindCoordinatorRequest( CoordinatorRequestManager coordinatorManager, @@ -273,7 +274,6 @@ public class CoordinatorRequestManagerTest { new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, - this.backgroundEventHandler, groupId ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 3ee420f2c49..8ec6b3c5327 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) @@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) val consumer = createConsumer() @@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index a8a66dbd54f..d666bee6a41 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -441,8 +441,4 @@ object QuorumTestHarness { // The following is for tests that only work with the classic group protocol because of relying on Zookeeper def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT))) - - // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer - // implementation that would otherwise cause tests to fail. - def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly }