This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36b48536f62 MINOR: Fix broken test (#18062)
36b48536f62 is described below

commit 36b48536f62698a5cc6be3c24bb68da6e2602b57
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Dec 5 21:31:52 2024 -0500

    MINOR: Fix broken test (#18062)
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai 
<[email protected]>, TaiJuWu <[email protected]>
---
 .../requests/ConsumerGroupHeartbeatRequest.java    |  8 +--
 .../ConsumerHeartbeatRequestManagerTest.java       | 61 +++++++++++++++-------
 2 files changed, 46 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 4f977a99fbe..5b09131d494 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -46,6 +46,10 @@ public class ConsumerGroupHeartbeatRequest extends 
AbstractRequest {
      */
     public static final int CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
 
+    public static final String REGEX_RESOLUTION_NOT_SUPPORTED_MSG = "The 
cluster does not support " +
+        "regular expressions resolution on ConsumerGroupHeartbeat API version 
0. It must be upgraded to use " +
+        "ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a 
SubscriptionPattern.";
+
     public static class Builder extends 
AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
         private final ConsumerGroupHeartbeatRequestData data;
 
@@ -61,9 +65,7 @@ public class ConsumerGroupHeartbeatRequest extends 
AbstractRequest {
         @Override
         public ConsumerGroupHeartbeatRequest build(short version) {
             if (version == 0 && data.subscribedTopicRegex() != null) {
-                throw new UnsupportedVersionException("The cluster does not 
support regular expressions resolution " +
-                    "on ConsumerGroupHeartbeat API version " + version + ". It 
must be upgraded to use " +
-                    "ConsumerGroupHeartbeat API version >= 1 to allow to 
subscribe to a SubscriptionPattern.");
+                throw new 
UnsupportedVersionException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG);
             }
             return new ConsumerGroupHeartbeatRequest(data, version);
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 415a84ebbb9..8fc23f6255a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -65,6 +66,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.SortedSet;
 
+import static 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
+import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
 import static org.apache.kafka.common.utils.Utils.mkSortedSet;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -602,30 +605,21 @@ public class ConsumerHeartbeatRequestManagerTest {
         }
     }
 
-    @Test
-    public void testUnsupportedVersion() {
-        mockErrorResponse(Errors.UNSUPPORTED_VERSION, null);
+    /**
+     * This validates the UnsupportedApiVersion the client generates while 
building a HB if:
+     * 1. HB API is not supported.
+     * 2. Required HB API version is not available.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, 
REGEX_RESOLUTION_NOT_SUPPORTED_MSG})
+    public void testUnsupportedVersion(String errorMsg) {
+        mockResponseWithException(new UnsupportedVersionException(errorMsg));
         ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = 
ArgumentCaptor.forClass(ErrorEvent.class);
         verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
         ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
-
-        // UnsupportedApiVersion in HB response without any custom message. 
It's considered as new protocol not supported.
-        String hbNotSupportedMsg = "The cluster does not support the new 
consumer group protocol. Set group" +
-            ".protocol=classic on the consumer configs to revert to the 
classic protocol until the cluster is upgraded.";
         assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), 
errorEvent.error());
-        assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage());
+        assertEquals(errorMsg, errorEvent.error().getMessage());
         clearInvocations(backgroundEventHandler);
-
-        // UnsupportedApiVersion in HB response with custom message. Specific 
to required version not present, should
-        // keep the custom message.
-        String hbVersionNotSupportedMsg = "The cluster does not support 
resolution of SubscriptionPattern on version 0. " +
-            "It must be upgraded to version >= 1 to allow to subscribe to a 
SubscriptionPattern.";
-        mockErrorResponse(Errors.UNSUPPORTED_VERSION, 
hbVersionNotSupportedMsg);
-        errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
-        verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
-        errorEvent = errorEventArgumentCaptor.getValue();
-        assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), 
errorEvent.error());
-        assertEquals(hbVersionNotSupportedMsg, 
errorEvent.error().getMessage());
     }
 
     private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
@@ -637,7 +631,17 @@ public class ConsumerHeartbeatRequestManagerTest {
         ClientResponse response = createHeartbeatResponse(
             result.unsentRequests.get(0), error, exceptionCustomMsg);
         result.unsentRequests.get(0).handler().onComplete(response);
-        ConsumerGroupHeartbeatResponse mockResponse = 
(ConsumerGroupHeartbeatResponse) response.responseBody();
+    }
+
+    private void mockResponseWithException(UnsupportedVersionException 
exception) {
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+
+        when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+        ClientResponse response = createHeartbeatResponseWithException(
+            result.unsentRequests.get(0), exception);
+        result.unsentRequests.get(0).handler().onComplete(response);
     }
 
     private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) 
{
@@ -1038,6 +1042,23 @@ public class ConsumerHeartbeatRequestManagerTest {
             response);
     }
 
+    private ClientResponse createHeartbeatResponseWithException(
+        final NetworkClientDelegate.UnsentRequest request,
+        final UnsupportedVersionException exception
+    ) {
+        ConsumerGroupHeartbeatResponse response = new 
ConsumerGroupHeartbeatResponse(null);
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, 
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
+            request.handler(),
+            "0",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            exception,
+            null,
+            response);
+    }
+
     private ConsumerConfig config() {
         Properties prop = new Properties();
         prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);

Reply via email to