This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch lm-commit-test
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 36a43c139b79cb2737099edddd2e60da5c0fe7cc
Author: Ken Huang <[email protected]>
AuthorDate: Fri Jan 31 00:22:54 2025 +0800

    KAFKA-18034: CommitRequestManager should fail pending requests on fatal 
coordinator errors (#18548)
    
    Reviewers: Lianet Magrans <[email protected]>, Kirk True 
<[email protected]>
---
 .../internals/AbstractHeartbeatRequestManager.java |  6 +++
 .../consumer/internals/CommitRequestManager.java   | 16 +++++++-
 .../internals/CoordinatorRequestManager.java       | 26 +++++++++----
 .../consumer/internals/RequestManagers.java        |  2 -
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../internals/CommitRequestManagerTest.java        | 17 ++++++++
 .../internals/CoordinatorRequestManagerTest.java   | 45 ++++++++--------------
 .../kafka/api/AuthorizerIntegrationTest.scala      |  6 +--
 .../kafka/server/QuorumTestHarness.scala           |  4 --
 9 files changed, 75 insertions(+), 49 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index b260e5e5fbf..fe8926abbf8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -163,6 +163,7 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
         if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager().shouldSkipHeartbeat()) {
             membershipManager().onHeartbeatRequestSkipped();
+            maybePropagateCoordinatorFatalErrorEvent();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
         pollTimer.update(currentTimeMs);
@@ -265,6 +266,11 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
         pollTimer.reset(maxPollIntervalMs);
     }
 
+    private void maybePropagateCoordinatorFatalErrorEvent() {
+        coordinatorRequestManager.getAndClearFatalError()
+                .ifPresent(fatalError -> backgroundEventHandler.add(new 
ErrorEvent(fatalError)));
+    }
+
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs, final boolean ignoreResponse) {
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
         heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 4f0deef5bf8..1d3503886a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -176,9 +176,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      */
     @Override
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
-        // poll only when the coordinator node is known.
-        if (coordinatorRequestManager.coordinator().isEmpty())
+        // poll when the coordinator node is known and fatal error is not 
present
+        if (coordinatorRequestManager.coordinator().isEmpty()) {
+            pendingRequests.maybeFailOnCoordinatorFatalError();
             return EMPTY;
+        }
 
         if (closing) {
             return drainPendingOffsetCommitRequests();
@@ -1246,6 +1248,16 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             clearAll();
             return res;
         }
+
+        private void maybeFailOnCoordinatorFatalError() {
+            coordinatorRequestManager.fatalError().ifPresent(error -> {
+                    log.warn("Failing all unsent commit requests and offset 
fetches because of coordinator fatal error. ", error);
+                    unsentOffsetCommits.forEach(request -> 
request.future.completeExceptionally(error));
+                    unsentOffsetFetches.forEach(request -> 
request.future.completeExceptionally(error));
+                    clearAll();
+                }
+            );
+        }
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 0f1650d0e67..dd53ae11790 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -53,7 +51,6 @@ import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
 public class CoordinatorRequestManager implements RequestManager {
     private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 
* 1000;
     private final Logger log;
-    private final BackgroundEventHandler backgroundEventHandler;
     private final String groupId;
 
     private final RequestState coordinatorRequestState;
@@ -61,17 +58,21 @@ public class CoordinatorRequestManager implements 
RequestManager {
     private long totalDisconnectedMin = 0;
     private boolean closing = false;
     private Node coordinator;
+    // Hold the latest fatal error received. It is exposed so that managers 
requiring a coordinator can access it and take 
+    // appropriate actions. 
+    // For example:
+    // - AbstractHeartbeatRequestManager propagates the error event to the 
application thread.
+    // - CommitRequestManager fail pending requests.
+    private Optional<Throwable> fatalError = Optional.empty();
 
     public CoordinatorRequestManager(
         final LogContext logContext,
         final long retryBackoffMs,
         final long retryBackoffMaxMs,
-        final BackgroundEventHandler errorHandler,
         final String groupId
     ) {
         Objects.requireNonNull(groupId);
         this.log = logContext.logger(this.getClass());
-        this.backgroundEventHandler = errorHandler;
         this.groupId = groupId;
         this.coordinatorRequestState = new RequestState(
                 logContext,
@@ -120,6 +121,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
         );
 
         return unsentRequest.whenComplete((clientResponse, throwable) -> {
+            getAndClearFatalError();
             if (clientResponse != null) {
                 FindCoordinatorResponse response = (FindCoordinatorResponse) 
clientResponse.responseBody();
                 onResponse(clientResponse.receivedTimeMs(), response);
@@ -206,12 +208,12 @@ public class CoordinatorRequestManager implements 
RequestManager {
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
+            fatalError = Optional.of(groupAuthorizationException);
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorEvent(exception));
+        fatalError = Optional.of(exception);
     }
 
     /**
@@ -250,4 +252,14 @@ public class CoordinatorRequestManager implements 
RequestManager {
     public Optional<Node> coordinator() {
         return Optional.ofNullable(this.coordinator);
     }
+    
+    public Optional<Throwable> getAndClearFatalError() {
+        Optional<Throwable> fatalError = this.fatalError;
+        this.fatalError = Optional.empty();
+        return fatalError;
+    }
+
+    public Optional<Throwable> fatalError() {
+        return fatalError;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 304f0fffd4a..960567ac96b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -194,7 +194,6 @@ public class RequestManagers implements Closeable {
                             logContext,
                             retryBackoffMs,
                             retryBackoffMaxMs,
-                            backgroundEventHandler,
                             groupRebalanceConfig.groupId);
                     commitRequestManager = new CommitRequestManager(
                             time,
@@ -295,7 +294,6 @@ public class RequestManagers implements Closeable {
                         logContext,
                         retryBackoffMs,
                         retryBackoffMaxMs,
-                        backgroundEventHandler,
                         groupRebalanceConfig.groupId);
                 ShareMembershipManager shareMembershipManager = new 
ShareMembershipManager(
                         logContext,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 33ca2844305..2749563df27 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -2239,7 +2239,7 @@ public class KafkaConsumerTest {
             // by the background thread, so it can realize there is 
authentication fail  and then
             // throw the AuthenticationException
             assertPollEventuallyThrows(consumer, AuthenticationException.class,
-                    "he consumer was not able to discover metadata errors 
during continuous polling.");
+                    "this consumer was not able to discover metadata errors 
during continuous polling.");
         } else {
             assertThrows(AuthenticationException.class, () -> 
consumer.poll(Duration.ZERO));
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 8e98c14b598..adaae1e35d4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -1482,6 +1482,23 @@ public class CommitRequestManagerTest {
         assertEquals("topic", data.topics().get(0).name());
     }
 
+    @Test
+    public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        commitRequestManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), 200);
+        assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetFetches.size());
+
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        when(coordinatorRequestManager.fatalError())
+                .thenReturn(Optional.of(new GroupAuthorizationException("Group 
authorization exception")));
+
+        assertEquals(NetworkClientDelegate.PollResult.EMPTY, 
commitRequestManager.poll(200));
+
+        assertEmptyPendingRequests(commitRequestManager);
+    }
+    
     private static void assertEmptyPendingRequests(CommitRequestManager 
commitRequestManager) {
         
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
         
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 2165cb814ed..0ed902d7f27 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -18,9 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -35,6 +33,8 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.logging.log4j.Level;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
 import java.util.List;
@@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 
 public class CoordinatorRequestManagerTest {
@@ -191,23 +189,10 @@ public class CoordinatorRequestManagerTest {
     }
 
     @Test
-    public void testPropagateAndBackoffAfterFatalError() {
+    public void testBackoffAfterFatalError() {
         CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);
         expectFindCoordinatorRequest(coordinatorManager, 
Errors.GROUP_AUTHORIZATION_FAILED);
 
-        verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
-            if (!(backgroundEvent instanceof ErrorEvent))
-                return false;
-
-            RuntimeException exception = ((ErrorEvent) 
backgroundEvent).error();
-
-            if (!(exception instanceof GroupAuthorizationException))
-                return false;
-
-            GroupAuthorizationException groupAuthException = 
(GroupAuthorizationException) exception;
-            return groupAuthException.groupId().equals(GROUP_ID);
-        }));
-
         time.sleep(RETRY_BACKOFF_MS - 1);
         assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
 
@@ -254,19 +239,20 @@ public class CoordinatorRequestManagerTest {
         assertEquals(1, res2.unsentRequests.size());
     }
 
-    @Test
-    public void testSignalOnClose() {
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"NONE", 
"COORDINATOR_NOT_AVAILABLE"})
+    public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors 
error) {
         CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);
-        expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
-        assertTrue(coordinatorManager.coordinator().isPresent());
-        coordinatorManager.markCoordinatorUnknown("coordinator changed", 
time.milliseconds());
-        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
-        coordinatorManager.signalClose();
-        time.sleep(RETRY_BACKOFF_MS - 1);
-        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests);
+        expectFindCoordinatorRequest(coordinatorManager, 
Errors.GROUP_AUTHORIZATION_FAILED);
+        assertTrue(coordinatorManager.fatalError().isPresent());
+
         time.sleep(RETRY_BACKOFF_MS);
-        assertEquals(Collections.emptyList(), 
coordinatorManager.poll(time.milliseconds()).unsentRequests,
-            "Should not generate find coordinator request during close");
+        // there are no successful responses, so the fatal error should persist
+        assertTrue(coordinatorManager.fatalError().isPresent());
+
+        // receiving a successful response should clear the fatal error
+        expectFindCoordinatorRequest(coordinatorManager, error);
+        assertTrue(coordinatorManager.fatalError().isEmpty());
     }
 
     private void expectFindCoordinatorRequest(
@@ -288,7 +274,6 @@ public class CoordinatorRequestManagerTest {
             new LogContext(),
             RETRY_BACKOFF_MS,
             RETRY_BACKOFF_MS,
-            this.backgroundEventHandler,
             groupId
         );
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4256d40da4a..f8e3e1fc54a 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1294,7 +1294,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
     val consumer = createConsumer()
     assertThrows(classOf[GroupAuthorizationException], () => 
consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
@@ -1331,7 +1331,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit 
= {
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
     val consumer = createConsumer()
@@ -1357,7 +1357,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): 
Unit = {
     createTopicWithBrokerPrincipal(topic)
     addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, 
WILDCARD_HOST, READ, ALLOW)), topicResource)
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 72b76cd152d..dac38f2de26 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -439,8 +439,4 @@ object QuorumTestHarness {
 
   // The following is for tests that only work with the classic group protocol 
because of relying on Zookeeper
   def 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: 
java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", 
GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
-
-  // The following parameter groups are to *temporarily* avoid bugs with the 
CONSUMER group protocol Consumer
-  // implementation that would otherwise cause tests to fail.
-  def 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: 
stream.Stream[Arguments] = 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
 }

Reply via email to