This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64f3ee4c336 KAFKA-17593; [2/N] Update request validation & validate
regex (#17651)
64f3ee4c336 is described below
commit 64f3ee4c336339ee17163cc742442fef0db7b62c
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 4 15:38:09 2024 +0100
KAFKA-17593; [2/N] Update request validation & validate regex (#17651)
This patch does two things:
1) Change the validation of the ConsumerGroupHeartbeat request to accept
subscribed topic names and/or subscribed topic regex. At least of them must be
set in the first request with epoch 0.
2) Validate the provided regular expression by compiling it.
Co-authored-by: Lianet Magrans <[email protected]>
Reviewers: Jeff Kim <[email protected]>, Lianet Magrans
<[email protected]>
---
checkstyle/import-control-group-coordinator.xml | 1 +
.../server/ConsumerGroupHeartbeatRequestTest.scala | 78 ++++++++++++++++++++++
.../coordinator/group/GroupMetadataManager.java | 37 ++++++++--
.../group/GroupMetadataManagerTest.java | 52 +++++++++++++--
4 files changed, 157 insertions(+), 11 deletions(-)
diff --git a/checkstyle/import-control-group-coordinator.xml
b/checkstyle/import-control-group-coordinator.xml
index 55d265343af..0619ea444d5 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -70,6 +70,7 @@
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
+ <allow pkg="com.google.re2j" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.HdrHistogram" />
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index d869109ab73..63004b99d2a 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -164,6 +164,84 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
}
+ @ClusterTest
+ def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("foo")
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ }, msg = s"Could not join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+ assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
+ }
+
+ @ClusterTest
+ def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
+ val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+ val admin = cluster.createAdminClient()
+
+ // Creates the __consumer_offsets topics because it won't be created
automatically
+ // in this test because it does not use FindCoordinator API.
+ TestUtils.createOffsetsTopicWithAdmin(
+ admin = admin,
+ brokers = raftCluster.brokers.values().asScala.toSeq,
+ controllers = raftCluster.controllers().values().asScala.toSeq
+ )
+
+ // Heartbeat request to join the group. Note that the member subscribes
+ // to an nonexistent topic.
+ val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("grp")
+ .setMemberId(Uuid.randomUuid().toString)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(List.empty.asJava),
+ true
+ ).build()
+
+ // Send the request until receiving a successful response. There is a delay
+ // here because the group coordinator is loaded in the background.
+ var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
+ consumerGroupHeartbeatResponse.data.errorCode ==
Errors.INVALID_REGULAR_EXPRESSION.code
+ }, msg = s"Did not receive the expected error. Last response
$consumerGroupHeartbeatResponse.")
+
+ // Verify the response.
+ assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code,
consumerGroupHeartbeatResponse.data.errorCode)
+ }
+
@ClusterTest
def
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled():
Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9a5c517a956..3435f3b3d2f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -118,6 +119,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
+
import org.slf4j.Logger;
import java.nio.ByteBuffer;
@@ -1312,7 +1316,6 @@ public class GroupMetadataManager {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
- throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex
is not supported yet.");
if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) {
@@ -1321,8 +1324,10 @@ public class GroupMetadataManager {
if (request.topicPartitions() == null ||
!request.topicPartitions().isEmpty()) {
throw new InvalidRequestException("TopicPartitions must be
empty when (re-)joining.");
}
- if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
- throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
+ boolean hasSubscribedTopicNames = request.subscribedTopicNames()
!= null && !request.subscribedTopicNames().isEmpty();
+ boolean hasSubscribedTopicRegex = request.subscribedTopicRegex()
!= null && !request.subscribedTopicRegex().isEmpty();
+ if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
+ throw new InvalidRequestException("SubscribedTopicNames or
SubscribedTopicRegex must be set in first request.");
}
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
throwIfNull(request.instanceId(), "InstanceId can't be null.");
@@ -1642,6 +1647,24 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Validates if the provided regular expression is valid.
+ *
+ * @param regex The regular expression to validate.
+ * @throws InvalidRegularExpression if the regular expression is invalid.
+ */
+ private static void throwIfRegularExpressionIsInvalid(
+ String regex
+ ) throws InvalidRegularExpression {
+ try {
+ Pattern.compile(regex);
+ } catch (PatternSyntaxException ex) {
+ throw new InvalidRegularExpression(
+ String.format("SubscribedTopicRegex `%s` is not a valid
regular expression: %s.",
+ regex, ex.getDescription()));
+ }
+ }
+
/**
* Deserialize the subscription in JoinGroupRequestProtocolCollection.
* All the protocols have the same subscription, so the method picks a
random one.
@@ -2390,13 +2413,14 @@ public class GroupMetadataManager {
* @param records The list to accumulate any new records.
* @return A boolean indicating whether the updatedMember has a different
* subscribedTopicNames/subscribedTopicRegex from the old member.
+ * @throws InvalidRegularExpression if the regular expression is invalid.
*/
private boolean hasMemberSubscriptionChanged(
String groupId,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
List<CoordinatorRecord> records
- ) {
+ ) throws InvalidRegularExpression {
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newConsumerGroupMemberSubscriptionRecord(groupId,
updatedMember));
@@ -2410,6 +2434,11 @@ public class GroupMetadataManager {
if
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.debug("[GroupId {}] Member {} updated its subscribed regex
to: {}.",
groupId, memberId, updatedMember.subscribedTopicRegex());
+ // If the regular expression has changed, we compile it to
ensure that
+ // its syntax is valid.
+ if (updatedMember.subscribedTopicRegex() != null) {
+
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
+ }
return true;
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 5d1962da233..b912b4f18de 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -206,7 +207,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(5000)));
assertEquals("TopicPartitions must be empty when (re-)joining.",
ex.getMessage());
- // SubscribedTopicNames must be present and empty in the first request
(epoch == 0).
+ // SubscribedTopicNames or SubscribedTopicRegex must be present and
non-empty in the first request (epoch == 0).
ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(memberId)
@@ -214,7 +215,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setTopicPartitions(Collections.emptyList())));
- assertEquals("SubscribedTopicNames must be set in first request.",
ex.getMessage());
+ assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set
in first request.", ex.getMessage());
// InstanceId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
@@ -257,20 +258,57 @@ public class GroupMetadataManagerTest {
@Test
public void testConsumerHeartbeatRegexValidation() {
+ String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new
GroupAssignment(Collections.emptyMap()));
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.build();
- Exception ex;
- // Regex not supported for now. This test will evolve to actually
validate the regex when it's supported
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
+
+ // Subscribing with an invalid regular expression fails.
+ Exception ex = assertThrows(InvalidRegularExpression.class, () ->
context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setMemberId(Uuid.randomUuid().toString())
.setGroupId("foo")
+ .setMemberId(memberId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
- .setSubscribedTopicRegex("t*")));
- assertEquals("SubscribedTopicRegex is not supported yet.",
ex.getMessage());
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(Collections.emptyList())));
+ assertEquals("SubscribedTopicRegex `[` is not a valid regular
expression: missing closing ].", ex.getMessage());
+
+ // Subscribing with a valid regular expression succeeds.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex(".*")
+ .setTopicPartitions(Collections.emptyList()));
+ assertEquals(1, result.response().memberEpoch());
+
+ // Updating the subscription to an invalid regular expression fails.
+ assertThrows(InvalidRegularExpression.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("[")
+ .setTopicPartitions(Collections.emptyList())));
+ assertEquals("SubscribedTopicRegex `[` is not a valid regular
expression: missing closing ].", ex.getMessage());
+
+ // Updating the subscription to topic names succeeds (checking when
the regex becomes null).
+ result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setTopicPartitions(Collections.emptyList()));
+ assertEquals(2, result.response().memberEpoch());
}
@Test