This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36b48536f62 MINOR: Fix broken test (#18062)
36b48536f62 is described below
commit 36b48536f62698a5cc6be3c24bb68da6e2602b57
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Dec 5 21:31:52 2024 -0500
MINOR: Fix broken test (#18062)
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>, TaiJuWu <[email protected]>
---
.../requests/ConsumerGroupHeartbeatRequest.java | 8 +--
.../ConsumerHeartbeatRequestManagerTest.java | 61 +++++++++++++++-------
2 files changed, 46 insertions(+), 23 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 4f977a99fbe..5b09131d494 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -46,6 +46,10 @@ public class ConsumerGroupHeartbeatRequest extends
AbstractRequest {
*/
public static final int CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
+ public static final String REGEX_RESOLUTION_NOT_SUPPORTED_MSG = "The
cluster does not support " +
+ "regular expressions resolution on ConsumerGroupHeartbeat API version
0. It must be upgraded to use " +
+ "ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a
SubscriptionPattern.";
+
public static class Builder extends
AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data;
@@ -61,9 +65,7 @@ public class ConsumerGroupHeartbeatRequest extends
AbstractRequest {
@Override
public ConsumerGroupHeartbeatRequest build(short version) {
if (version == 0 && data.subscribedTopicRegex() != null) {
- throw new UnsupportedVersionException("The cluster does not
support regular expressions resolution " +
- "on ConsumerGroupHeartbeat API version " + version + ". It
must be upgraded to use " +
- "ConsumerGroupHeartbeat API version >= 1 to allow to
subscribe to a SubscriptionPattern.");
+ throw new
UnsupportedVersionException(REGEX_RESOLUTION_NOT_SUPPORTED_MSG);
}
return new ConsumerGroupHeartbeatRequest(data, version);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 415a84ebbb9..8fc23f6255a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -65,6 +66,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
+import static
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -602,30 +605,21 @@ public class ConsumerHeartbeatRequestManagerTest {
}
}
- @Test
- public void testUnsupportedVersion() {
- mockErrorResponse(Errors.UNSUPPORTED_VERSION, null);
+ /**
+ * This validates the UnsupportedApiVersion the client generates while
building a HB if:
+ * 1. HB API is not supported.
+ * 2. Required HB API version is not available.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = {CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG,
REGEX_RESOLUTION_NOT_SUPPORTED_MSG})
+ public void testUnsupportedVersion(String errorMsg) {
+ mockResponseWithException(new UnsupportedVersionException(errorMsg));
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor =
ArgumentCaptor.forClass(ErrorEvent.class);
verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();
-
- // UnsupportedApiVersion in HB response without any custom message.
It's considered as new protocol not supported.
- String hbNotSupportedMsg = "The cluster does not support the new
consumer group protocol. Set group" +
- ".protocol=classic on the consumer configs to revert to the
classic protocol until the cluster is upgraded.";
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(),
errorEvent.error());
- assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage());
+ assertEquals(errorMsg, errorEvent.error().getMessage());
clearInvocations(backgroundEventHandler);
-
- // UnsupportedApiVersion in HB response with custom message. Specific
to required version not present, should
- // keep the custom message.
- String hbVersionNotSupportedMsg = "The cluster does not support
resolution of SubscriptionPattern on version 0. " +
- "It must be upgraded to version >= 1 to allow to subscribe to a
SubscriptionPattern.";
- mockErrorResponse(Errors.UNSUPPORTED_VERSION,
hbVersionNotSupportedMsg);
- errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
- verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture());
- errorEvent = errorEventArgumentCaptor.getValue();
- assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(),
errorEvent.error());
- assertEquals(hbVersionNotSupportedMsg,
errorEvent.error().getMessage());
}
private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
@@ -637,7 +631,17 @@ public class ConsumerHeartbeatRequestManagerTest {
ClientResponse response = createHeartbeatResponse(
result.unsentRequests.get(0), error, exceptionCustomMsg);
result.unsentRequests.get(0).handler().onComplete(response);
- ConsumerGroupHeartbeatResponse mockResponse =
(ConsumerGroupHeartbeatResponse) response.responseBody();
+ }
+
+ private void mockResponseWithException(UnsupportedVersionException
exception) {
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+
+ when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true);
+ ClientResponse response = createHeartbeatResponseWithException(
+ result.unsentRequests.get(0), exception);
+ result.unsentRequests.get(0).handler().onComplete(response);
}
private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs)
{
@@ -1038,6 +1042,23 @@ public class ConsumerHeartbeatRequestManagerTest {
response);
}
+ private ClientResponse createHeartbeatResponseWithException(
+ final NetworkClientDelegate.UnsentRequest request,
+ final UnsupportedVersionException exception
+ ) {
+ ConsumerGroupHeartbeatResponse response = new
ConsumerGroupHeartbeatResponse(null);
+ return new ClientResponse(
+ new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT,
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1),
+ request.handler(),
+ "0",
+ time.milliseconds(),
+ time.milliseconds(),
+ false,
+ exception,
+ null,
+ response);
+ }
+
private ConsumerConfig config() {
Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);