This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 21c4539dfe1 Revert "KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050)" 21c4539dfe1 is described below commit 21c4539dfe1134e60a7d8680d9ea19ae48f569a3 Author: Lianet Magrans <lmagr...@confluent.io> AuthorDate: Wed Jan 15 07:30:46 2025 -0500 Revert "KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050)" This reverts commit 46b825c9e651ee3e901e97d43fa7bf2ba10ff749. --- .../internals/AbstractHeartbeatRequestManager.java | 6 ---- .../consumer/internals/CommitRequestManager.java | 16 ++------- .../internals/CoordinatorRequestManager.java | 24 ++++---------- .../consumer/internals/RequestManagers.java | 2 ++ .../kafka/clients/consumer/KafkaConsumerTest.java | 2 +- .../internals/CommitRequestManagerTest.java | 18 ---------- .../internals/CoordinatorRequestManagerTest.java | 38 +++++++++++----------- .../kafka/api/AuthorizerIntegrationTest.scala | 6 ++-- .../kafka/server/QuorumTestHarness.scala | 4 +++ 9 files changed, 38 insertions(+), 78 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index f0bd4f43fca..b260e5e5fbf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -163,7 +163,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) { membershipManager().onHeartbeatRequestSkipped(); - maybePropagateCoordinatorFatalErrorEvent(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); @@ -266,11 +265,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse pollTimer.reset(maxPollIntervalMs); } - private void maybePropagateCoordinatorFatalErrorEvent() { - coordinatorRequestManager.fatalError() - .ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError))); - } - private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); heartbeatRequestState.onSendAttempt(currentTimeMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 1d3503886a9..4f0deef5bf8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -176,11 +176,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener */ @Override public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { - // poll when the coordinator node is known and fatal error is not present - if (coordinatorRequestManager.coordinator().isEmpty()) { - pendingRequests.maybeFailOnCoordinatorFatalError(); + // poll only when the coordinator node is known. + if (coordinatorRequestManager.coordinator().isEmpty()) return EMPTY; - } if (closing) { return drainPendingOffsetCommitRequests(); @@ -1248,16 +1246,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener clearAll(); return res; } - - private void maybeFailOnCoordinatorFatalError() { - coordinatorRequestManager.fatalError().ifPresent(error -> { - log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error); - unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error)); - unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error)); - clearAll(); - } - ); - } } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java index 07c4fad45b1..4664267a0e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; @@ -51,27 +53,24 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate. public class CoordinatorRequestManager implements RequestManager { private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private final Logger log; + private final BackgroundEventHandler backgroundEventHandler; private final String groupId; private final RequestState coordinatorRequestState; private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long totalDisconnectedMin = 0; private Node coordinator; - // Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take - // appropriate actions. - // For example: - // - AbstractHeartbeatRequestManager propagates the error event to the application thread. - // - CommitRequestManager fail pending requests. - private Optional<Throwable> fatalError = Optional.empty(); public CoordinatorRequestManager( final LogContext logContext, final long retryBackoffMs, final long retryBackoffMaxMs, + final BackgroundEventHandler errorHandler, final String groupId ) { Objects.requireNonNull(groupId); this.log = logContext.logger(this.getClass()); + this.backgroundEventHandler = errorHandler; this.groupId = groupId; this.coordinatorRequestState = new RequestState( logContext, @@ -115,7 +114,6 @@ public class CoordinatorRequestManager implements RequestManager { ); return unsentRequest.whenComplete((clientResponse, throwable) -> { - clearFatalError(); if (clientResponse != null) { FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); onResponse(clientResponse.receivedTimeMs(), response); @@ -202,12 +200,12 @@ public class CoordinatorRequestManager implements RequestManager { if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); - fatalError = Optional.of(groupAuthorizationException); + backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); return; } log.warn("FindCoordinator request failed due to fatal exception", exception); - fatalError = Optional.of(exception); + backgroundEventHandler.add(new ErrorEvent(exception)); } /** @@ -246,12 +244,4 @@ public class CoordinatorRequestManager implements RequestManager { public Optional<Node> coordinator() { return Optional.ofNullable(this.coordinator); } - - private void clearFatalError() { - this.fatalError = Optional.empty(); - } - - public Optional<Throwable> fatalError() { - return fatalError; - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 960567ac96b..304f0fffd4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -194,6 +194,7 @@ public class RequestManagers implements Closeable { logContext, retryBackoffMs, retryBackoffMaxMs, + backgroundEventHandler, groupRebalanceConfig.groupId); commitRequestManager = new CommitRequestManager( time, @@ -294,6 +295,7 @@ public class RequestManagers implements Closeable { logContext, retryBackoffMs, retryBackoffMaxMs, + backgroundEventHandler, groupRebalanceConfig.groupId); ShareMembershipManager shareMembershipManager = new ShareMembershipManager( logContext, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 2749563df27..33ca2844305 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2239,7 +2239,7 @@ public class KafkaConsumerTest { // by the background thread, so it can realize there is authentication fail and then // throw the AuthenticationException assertPollEventuallyThrows(consumer, AuthenticationException.class, - "this consumer was not able to discover metadata errors during continuous polling."); + "he consumer was not able to discover metadata errors during continuous polling."); } else { assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 0408b156a6d..8e98c14b598 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -1482,23 +1481,6 @@ public class CommitRequestManagerTest { OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); assertEquals("topic", data.topics().get(0).name()); } - - @Test - public void testPollWithFatalErrorShouldFailAllUnsentRequests() { - CommitRequestManager commitRequestManager = create(true, 100); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); - - commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200); - assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size()); - - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); - when(coordinatorRequestManager.fatalError()) - .thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception"))); - - assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200)); - - assertEmptyPendingRequests(commitRequestManager); - } private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java index 72599100a36..7e805dc3cd3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java @@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -33,8 +35,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; import java.util.List; @@ -49,7 +49,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; public class CoordinatorRequestManagerTest { @@ -189,10 +191,23 @@ public class CoordinatorRequestManagerTest { } @Test - public void testBackoffAfterFatalError() { + public void testPropagateAndBackoffAfterFatalError() { CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); + verify(backgroundEventHandler).add(argThat(backgroundEvent -> { + if (!(backgroundEvent instanceof ErrorEvent)) + return false; + + RuntimeException exception = ((ErrorEvent) backgroundEvent).error(); + + if (!(exception instanceof GroupAuthorizationException)) + return false; + + GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception; + return groupAuthException.groupId().equals(GROUP_ID); + })); + time.sleep(RETRY_BACKOFF_MS - 1); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); @@ -238,22 +253,6 @@ public class CoordinatorRequestManagerTest { res2 = coordinatorManager.poll(time.milliseconds()); assertEquals(1, res2.unsentRequests.size()); } - - @ParameterizedTest - @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"}) - public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) { - CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); - expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); - assertTrue(coordinatorManager.fatalError().isPresent()); - - time.sleep(RETRY_BACKOFF_MS); - // there are no successful responses, so the fatal error should persist - assertTrue(coordinatorManager.fatalError().isPresent()); - - // receiving a successful response should clear the fatal error - expectFindCoordinatorRequest(coordinatorManager, error); - assertTrue(coordinatorManager.fatalError().isEmpty()); - } private void expectFindCoordinatorRequest( CoordinatorRequestManager coordinatorManager, @@ -274,6 +273,7 @@ public class CoordinatorRequestManagerTest { new LogContext(), RETRY_BACKOFF_MS, RETRY_BACKOFF_MS, + this.backgroundEventHandler, groupId ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 8ec6b3c5327..3ee420f2c49 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) @@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) val consumer = createConsumer() @@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { createTopicWithBrokerPrincipal(topic) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index d666bee6a41..a8a66dbd54f 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -441,4 +441,8 @@ object QuorumTestHarness { // The following is for tests that only work with the classic group protocol because of relying on Zookeeper def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT))) + + // The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer + // implementation that would otherwise cause tests to fail. + def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly }