This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab0df20489a KAFKA-17592; Support for SubscribedTopicsRegex in
ConsumerGroupHeartbeat RPC (#17257)
ab0df20489a is described below
commit ab0df20489a8d6079cc576f4d9f7326db4b333c9
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Sep 25 03:52:05 2024 -0400
KAFKA-17592; Support for SubscribedTopicsRegex in ConsumerGroupHeartbeat
RPC (#17257)
This patch includes:
- Bump ConsumerGroupHeartbeatRequest version to include
subscribedTopicRegex field
- Introduce new error code for InvalidRegularExpression
- Bump ConsumerGroupHeartbeatResponse version to support new regex error
- Wire the new field into the GroupMetadataManager when processing HB
Reviewers: David Jacot <[email protected]>
---
.../common/errors/InvalidRegularExpression.java | 27 ++++++++++++++++++++++
.../org/apache/kafka/common/protocol/Errors.java | 4 +++-
.../requests/ConsumerGroupHeartbeatResponse.java | 1 +
.../message/ConsumerGroupHeartbeatRequest.json | 6 ++++-
.../message/ConsumerGroupHeartbeatResponse.json | 3 ++-
.../coordinator/group/GroupMetadataManager.java | 8 ++++++-
.../group/GroupMetadataManagerTest.java | 17 ++++++++++++++
7 files changed, 62 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
new file mode 100644
index 00000000000..f1ea4d19abc
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when a regular expression received in a request is not valid.
+ */
+public class InvalidRegularExpression extends ApiException {
+ public InvalidRegularExpression(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b304d7e08ff..a80ec308ebb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -67,6 +67,7 @@ import
org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRegistrationException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -409,7 +410,8 @@ public enum Errors {
FENCED_STATE_EPOCH(124, "The share coordinator rejected the request
because the share-group state epoch did not match.",
FencedStateEpochException::new),
INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving
replica's key.", InvalidVoterKeyException::new),
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.",
DuplicateVoterException::new),
- VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.",
VoterNotFoundException::new);
+ VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.",
VoterNotFoundException::new),
+ INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.",
InvalidRegularExpression::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
index 6f881cc8efa..76f89ed4df8 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
@@ -38,6 +38,7 @@ import java.util.Map;
* - {@link Errors#UNSUPPORTED_ASSIGNOR}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ * - {@link Errors#INVALID_REGULAR_EXPRESSION}
*/
public class ConsumerGroupHeartbeatResponse extends AbstractResponse {
private final ConsumerGroupHeartbeatResponseData data;
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
index 71c6e2e2502..669e2547758 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -18,8 +18,10 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "ConsumerGroupHeartbeatRequest",
- "validVersions": "0",
+ // Version 1 adds SubscribedTopicRegex (KIP-848).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
+ "latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
@@ -35,6 +37,8 @@
"about": "-1 if it didn't change since the last heartbeat; the maximum
time in milliseconds that the coordinator will wait on the member to revoke its
partitions otherwise." },
{ "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
"about": "null if it didn't change since the last heartbeat; the
subscribed topic names otherwise." },
+ { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+",
"nullableVersions": "1+", "default": "null",
+ "about": "null if it didn't change since the last heartbeat; the
subscribed topic regex otherwise" },
{ "name": "ServerAssignor", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "null if not used or if it didn't change since the last
heartbeat; the server side assignor to use otherwise." },
{ "name": "TopicPartitions", "type": "[]TopicPartitions", "versions":
"0+", "nullableVersions": "0+", "default": "null",
diff --git
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
index fb55f80bd40..cda757cb32f 100644
---
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -17,7 +17,7 @@
"apiKey": 68,
"type": "response",
"name": "ConsumerGroupHeartbeatResponse",
- "validVersions": "0",
+ "validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -30,6 +30,7 @@
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
+ // - INVALID_SUBSCRIPTION_REGEX (version 1+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2a48c87c32d..ecf7efaad49 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1280,6 +1280,7 @@ public class GroupMetadataManager {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+ throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex
is not supported yet.");
if (request.memberEpoch() > 0 || request.memberEpoch() ==
LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
@@ -1685,7 +1686,9 @@ public class GroupMetadataManager {
* @param clientId The client id.
* @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from
the request
- * of null.
+ * or null.
+ * @param subscribedTopicRegex The regular expression based subscription
from the request
+ * or null.
* @param assignorName The assignor name from the request or null.
* @param ownedTopicPartitions The list of owned partitions from the
request or null.
*
@@ -1702,6 +1705,7 @@ public class GroupMetadataManager {
String clientId,
String clientHost,
List<String> subscribedTopicNames,
+ String subscribedTopicRegex,
String assignorName,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions>
ownedTopicPartitions
) throws ApiException {
@@ -1749,6 +1753,7 @@ public class GroupMetadataManager {
.maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
.setClientId(clientId)
.setClientHost(clientHost)
.setClassicMemberMetadata(null)
@@ -3159,6 +3164,7 @@ public class GroupMetadataManager {
context.clientId(),
context.clientAddress.toString(),
request.subscribedTopicNames(),
+ request.subscribedTopicRegex(),
request.serverAssignor(),
request.topicPartitions()
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2a56cd912cd..653662b859b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -248,6 +248,23 @@ public class GroupMetadataManagerTest {
assertEquals("InstanceId can't be null.", ex.getMessage());
}
+ @Test
+ public void testConsumerHeartbeatRegexValidation() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .build();
+ Exception ex;
+ // Regex not supported for now. This test will evolve to actually
validate the regex when it's supported
+ ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("t*")));
+ assertEquals("SubscribedTopicRegex is not supported yet.",
ex.getMessage());
+ }
+
@Test
public void testMemberIdGeneration() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");