This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0d7c7659811 KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J
regex (#17897)
0d7c7659811 is described below
commit 0d7c7659811d5edafa9d33bcce8984b08f1ecb91
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Nov 22 11:58:20 2024 -0500
KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J regex (#17897)
Reviewers: David Jacot <[email protected]>
---
.../apache/kafka/clients/consumer/Consumer.java | 10 ++++
.../kafka/clients/consumer/KafkaConsumer.java | 50 ++++++++++++++++++
.../kafka/clients/consumer/MockConsumer.java | 10 ++++
.../clients/consumer/SubscriptionPattern.java | 59 ++++++++++++++++++++++
.../consumer/internals/AsyncKafkaConsumer.java | 34 +++++++++++++
.../consumer/internals/ClassicKafkaConsumer.java | 13 +++++
.../kafka/clients/consumer/KafkaConsumerTest.java | 12 +++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 14 +++++
8 files changed, 202 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 42013955783..2c8376e5ccd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -73,6 +73,16 @@ public interface Consumer<K, V> extends Closeable {
*/
void subscribe(Pattern pattern);
+ /**
+ * @see KafkaConsumer#subscribe(SubscriptionPattern,
ConsumerRebalanceListener)
+ */
+ void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener
callback);
+
+ /**
+ * @see KafkaConsumer#subscribe(SubscriptionPattern)
+ */
+ void subscribe(SubscriptionPattern pattern);
+
/**
* @see KafkaConsumer#unsubscribe()
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8ba5cff7c08..92e4cf2a550 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
@@ -755,6 +756,55 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
delegate.subscribe(pattern);
}
+ /**
+ * Subscribe to all topics matching the specified pattern, to get
dynamically assigned partitions.
+ * The pattern matching will be done periodically against all topics. This
is only supported under the
+ * CONSUMER group protocol (see {@link
ConsumerConfig#GROUP_PROTOCOL_CONFIG}).
+ * <p>
+ * If the provided pattern is not compatible with Google RE2/J, an {@link
InvalidRegularExpression} will be
+ * eventually thrown on a call to {@link #poll(Duration)} following this
call to subscribe.
+ * <p>
+ * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for
details on the
+ * use of the {@link ConsumerRebalanceListener}. Generally, rebalances are
triggered when there
+ * is a change to the topics matching the provided pattern and when
consumer group membership changes.
+ * Group rebalances only take place during an active call to {@link
#poll(Duration)}.
+ *
+ * @param pattern Pattern to subscribe to, that must be compatible with
Google RE2/J.
+ * @param listener Non-null listener instance to get notifications on
partition assignment/revocation for the
+ * subscribed topics.
+ * @throws IllegalArgumentException If pattern is null or empty, or if the
listener is null.
+ * @throws IllegalStateException If {@code subscribe()} is called
previously with topics, or assign is called
+ * previously (without a subsequent call
to {@link #unsubscribe()}).
+ */
+ @Override
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener listener) {
+ delegate.subscribe(pattern, listener);
+ }
+
+ /**
+ * Subscribe to all topics matching the specified pattern, to get
dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics. This is
only supported under the
+ * CONSUMER group protocol (see {@link
ConsumerConfig#GROUP_PROTOCOL_CONFIG})
+ * <p>
+ * If the provided pattern is not compatible with Google RE2/J, an {@link
InvalidRegularExpression} will be
+ * eventually thrown on a call to {@link #poll(Duration)} following this
call to subscribe.
+ * <p>
+ * This is a short-hand for {@link #subscribe(Pattern,
ConsumerRebalanceListener)}, which
+ * uses a no-op listener. If you need the ability to seek to particular
offsets, you should prefer
+ * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group
rebalances will cause partition offsets
+ * to be reset. You should also provide your own listener if you are doing
your own offset
+ * management since the listener gives you an opportunity to commit
offsets before a rebalance finishes.
+ *
+ * @param pattern Pattern to subscribe to, that must be compatible with
Google RE2/J.
+ * @throws IllegalArgumentException If pattern is null or empty.
+ * @throws IllegalStateException If {@code subscribe()} is called
previously with topics, or assign is called
+ * previously (without a subsequent call
to {@link #unsubscribe()}).
+ */
+ @Override
+ public void subscribe(SubscriptionPattern pattern) {
+ delegate.subscribe(pattern);
+ }
+
/**
* Unsubscribe from topics currently subscribed with {@link
#subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link
#assign(Collection)}.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 3c29c749acf..4bdffc48c23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -143,6 +143,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscribe(pattern, Optional.empty());
}
+ @Override
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener callback) {
+ throw new UnsupportedOperationException("Subscribe to RE2/J regular
expression not supported in MockConsumer yet");
+ }
+
+ @Override
+ public void subscribe(SubscriptionPattern pattern) {
+ throw new UnsupportedOperationException("Subscribe to RE2/J regular
expression not supported in MockConsumer yet");
+ }
+
@Override
public void subscribe(Collection<String> topics, final
ConsumerRebalanceListener listener) {
if (listener == null)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
new file mode 100644
index 00000000000..d6e168b8da1
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Objects;
+
+/**
+ * Represents a regular expression compatible with Google RE2/J, used to
subscribe to topics.
+ * This just keeps the String representation of the pattern, and all
validations to ensure
+ * it is RE2/J compatible are delegated to the broker.
+ */
+public class SubscriptionPattern {
+
+ /**
+ * String representation the regular expression, compatible with RE2/J.
+ */
+ private final String pattern;
+
+ public SubscriptionPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ /**
+ * @return Regular expression pattern compatible with RE2/J.
+ */
+ public String pattern() {
+ return this.pattern;
+ }
+
+ @Override
+ public String toString() {
+ return pattern;
+ }
+
+ @Override
+ public int hashCode() {
+ return pattern.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof SubscriptionPattern &&
+ Objects.equals(pattern, ((SubscriptionPattern) obj).pattern);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index ba7eed19f11..3ab84fd7aec 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
@@ -1795,6 +1796,16 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscribeInternal(pattern, Optional.empty());
}
+ @Override
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener callback) {
+ subscribeToRegex(pattern, Optional.ofNullable(callback));
+ }
+
+ @Override
+ public void subscribe(SubscriptionPattern pattern) {
+ subscribeToRegex(pattern, Optional.empty());
+ }
+
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
{
if (listener == null)
@@ -1857,6 +1868,29 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
+ /**
+ * Subscribe to the RE2/J pattern. This will generate an event to update
the pattern in the
+ * subscription, so it's included in a next heartbeat request sent to the
broker. No validation of the pattern is
+ * performed by the client (other than null/empty checks).
+ */
+ private void subscribeToRegex(SubscriptionPattern pattern,
+ Optional<ConsumerRebalanceListener>
listener) {
+ maybeThrowInvalidGroupIdException();
+ throwIfSubscriptionPatternIsInvalid(pattern);
+ log.info("Subscribing to regular expression {}", pattern);
+
+ // TODO: generate event to update subscribed regex so it's included in
the next HB.
+ }
+
+ private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern
subscriptionPattern) {
+ if (subscriptionPattern == null) {
+ throw new IllegalArgumentException("Topic pattern to subscribe to
cannot be null");
+ }
+ if (subscriptionPattern.pattern().isEmpty()) {
+ throw new IllegalArgumentException("Topic pattern to subscribe to
cannot be empty");
+ }
+ }
+
private void subscribeInternal(Collection<String> topics,
Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index e423c261763..a3ac5e5698b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -520,6 +521,18 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscribeInternal(pattern, Optional.empty());
}
+ @Override
+ public void subscribe(SubscriptionPattern pattern,
ConsumerRebalanceListener callback) {
+ throw new UnsupportedOperationException(String.format("Subscribe to
RE2/J pattern is not supported when using" +
+ "the %s protocol defined in config %s", GroupProtocol.CLASSIC,
ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+ }
+
+ @Override
+ public void subscribe(SubscriptionPattern pattern) {
+ throw new UnsupportedOperationException(String.format("Subscribe to
RE2/J pattern is not supported when using" +
+ "the %s protocol defined in config %s", GroupProtocol.CLASSIC,
ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+ }
+
/**
* Internal helper method for {@link #subscribe(Pattern)} and
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7d122c2986c..2deaec2efea 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
@@ -3609,6 +3610,17 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
"Expected " + (groupProtocol == GroupProtocol.CLASSIC ?
"JoinGroup" : "Heartbeat") + " request");
}
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+ public void
testSubscribeToRe2jPatternNotSupportedForClassicConsumer(GroupProtocol
groupProtocol) {
+ KafkaConsumer<String, String> consumer =
newConsumerNoAutoCommit(groupProtocol, time, mock(NetworkClient.class),
subscription,
+ mock(ConsumerMetadata.class));
+ assertThrows(UnsupportedOperationException.class, () ->
+ consumer.subscribe(new SubscriptionPattern("t*")));
+ assertThrows(UnsupportedOperationException.class, () ->
+ consumer.subscribe(new SubscriptionPattern("t*"),
mock(ConsumerRebalanceListener.class)));
+ }
+
private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().equals(apiKey));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 8eb8ec4c85b..91e4cae0f98 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
@@ -1829,6 +1830,19 @@ public class AsyncKafkaConsumerTest {
assertEquals(OffsetResetStrategy.LATEST,
resetOffsetEvent.offsetResetStrategy());
}
+ @Test
+ public void testSubscribeToRe2JPatternValidation() {
+ consumer = newConsumer();
+
+ Throwable t = assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe((SubscriptionPattern) null));
+ assertEquals("Topic pattern to subscribe to cannot be null",
t.getMessage());
+
+ t = assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(new SubscriptionPattern("")));
+ assertEquals("Topic pattern to subscribe to cannot be empty",
t.getMessage());
+
+ assertDoesNotThrow(() -> consumer.subscribe(new
SubscriptionPattern("t*")));
+ }
+
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);