This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d7c7659811 KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J 
regex (#17897)
0d7c7659811 is described below

commit 0d7c7659811d5edafa9d33bcce8984b08f1ecb91
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Nov 22 11:58:20 2024 -0500

    KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J regex (#17897)
    
    Reviewers: David Jacot <[email protected]>
---
 .../apache/kafka/clients/consumer/Consumer.java    | 10 ++++
 .../kafka/clients/consumer/KafkaConsumer.java      | 50 ++++++++++++++++++
 .../kafka/clients/consumer/MockConsumer.java       | 10 ++++
 .../clients/consumer/SubscriptionPattern.java      | 59 ++++++++++++++++++++++
 .../consumer/internals/AsyncKafkaConsumer.java     | 34 +++++++++++++
 .../consumer/internals/ClassicKafkaConsumer.java   | 13 +++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 12 +++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 14 +++++
 8 files changed, 202 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 42013955783..2c8376e5ccd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -73,6 +73,16 @@ public interface Consumer<K, V> extends Closeable {
     */
     void subscribe(Pattern pattern);
 
+    /**
+     * @see KafkaConsumer#subscribe(SubscriptionPattern, 
ConsumerRebalanceListener)
+     */
+    void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener 
callback);
+
+    /**
+     * @see KafkaConsumer#subscribe(SubscriptionPattern)
+     */
+    void subscribe(SubscriptionPattern pattern);
+
     /**
      * @see KafkaConsumer#unsubscribe()
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8ba5cff7c08..92e4cf2a550 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -755,6 +756,55 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
         delegate.subscribe(pattern);
     }
 
+    /**
+     * Subscribe to all topics matching the specified pattern, to get 
dynamically assigned partitions.
+     * The pattern matching will be done periodically against all topics. This 
is only supported under the
+     * CONSUMER group protocol (see {@link 
ConsumerConfig#GROUP_PROTOCOL_CONFIG}).
+     * <p>
+     * If the provided pattern is not compatible with Google RE2/J, an {@link 
InvalidRegularExpression} will be
+     * eventually thrown on a call to {@link #poll(Duration)} following this 
call to subscribe.
+     * <p>
+     * See {@link #subscribe(Collection, ConsumerRebalanceListener)} for 
details on the
+     * use of the {@link ConsumerRebalanceListener}. Generally, rebalances are 
triggered when there
+     * is a change to the topics matching the provided pattern and when 
consumer group membership changes.
+     * Group rebalances only take place during an active call to {@link 
#poll(Duration)}.
+     *
+     * @param pattern  Pattern to subscribe to, that must be compatible with 
Google RE2/J.
+     * @param listener Non-null listener instance to get notifications on 
partition assignment/revocation for the
+     *                 subscribed topics.
+     * @throws IllegalArgumentException If pattern is null or empty, or if the 
listener is null.
+     * @throws IllegalStateException    If {@code subscribe()} is called 
previously with topics, or assign is called
+     *                                  previously (without a subsequent call 
to {@link #unsubscribe()}).
+     */
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener listener) {
+        delegate.subscribe(pattern, listener);
+    }
+
+    /**
+     * Subscribe to all topics matching the specified pattern, to get 
dynamically assigned partitions.
+     * The pattern matching will be done periodically against topics. This is 
only supported under the
+     * CONSUMER group protocol (see {@link 
ConsumerConfig#GROUP_PROTOCOL_CONFIG})
+     * <p>
+     * If the provided pattern is not compatible with Google RE2/J, an {@link 
InvalidRegularExpression} will be
+     * eventually thrown on a call to {@link #poll(Duration)} following this 
call to subscribe.
+     * <p>
+     * This is a short-hand for {@link #subscribe(Pattern, 
ConsumerRebalanceListener)}, which
+     * uses a no-op listener. If you need the ability to seek to particular 
offsets, you should prefer
+     * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group 
rebalances will cause partition offsets
+     * to be reset. You should also provide your own listener if you are doing 
your own offset
+     * management since the listener gives you an opportunity to commit 
offsets before a rebalance finishes.
+     *
+     * @param pattern Pattern to subscribe to, that must be compatible with 
Google RE2/J.
+     * @throws IllegalArgumentException If pattern is null or empty.
+     * @throws IllegalStateException    If {@code subscribe()} is called 
previously with topics, or assign is called
+     *                                  previously (without a subsequent call 
to {@link #unsubscribe()}).
+     */
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {
+        delegate.subscribe(pattern);
+    }
+
     /**
      * Unsubscribe from topics currently subscribed with {@link 
#subscribe(Collection)} or {@link #subscribe(Pattern)}.
      * This also clears any partitions directly assigned through {@link 
#assign(Collection)}.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 3c29c749acf..4bdffc48c23 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -143,6 +143,16 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         subscribe(pattern, Optional.empty());
     }
 
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+        throw new UnsupportedOperationException("Subscribe to RE2/J regular 
expression not supported in MockConsumer yet");
+    }
+
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {
+        throw new UnsupportedOperationException("Subscribe to RE2/J regular 
expression not supported in MockConsumer yet");
+    }
+
     @Override
     public void subscribe(Collection<String> topics, final 
ConsumerRebalanceListener listener) {
         if (listener == null)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
new file mode 100644
index 00000000000..d6e168b8da1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/SubscriptionPattern.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Objects;
+
+/**
+ * Represents a regular expression compatible with Google RE2/J, used to 
subscribe to topics.
+ * This just keeps the String representation of the pattern, and all 
validations to ensure
+ * it is RE2/J compatible are delegated to the broker.
+ */
+public class SubscriptionPattern {
+
+    /**
+     * String representation the regular expression, compatible with RE2/J.
+     */
+    private final String pattern;
+
+    public SubscriptionPattern(String pattern) {
+        this.pattern = pattern;
+    }
+
+    /**
+     * @return Regular expression pattern compatible with RE2/J.
+     */
+    public String pattern() {
+        return this.pattern;
+    }
+
+    @Override
+    public String toString() {
+        return pattern;
+    }
+
+    @Override
+    public int hashCode() {
+        return pattern.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof SubscriptionPattern &&
+            Objects.equals(pattern, ((SubscriptionPattern) obj).pattern);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index ba7eed19f11..3ab84fd7aec 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import 
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
@@ -1795,6 +1796,16 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         subscribeInternal(pattern, Optional.empty());
     }
 
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+        subscribeToRegex(pattern, Optional.ofNullable(callback));
+    }
+
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {
+        subscribeToRegex(pattern, Optional.empty());
+    }
+
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
         if (listener == null)
@@ -1857,6 +1868,29 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    /**
+     * Subscribe to the RE2/J pattern. This will generate an event to update 
the pattern in the
+     * subscription, so it's included in a next heartbeat request sent to the 
broker. No validation of the pattern is
+     * performed by the client (other than null/empty checks).
+     */
+    private void subscribeToRegex(SubscriptionPattern pattern,
+                                  Optional<ConsumerRebalanceListener> 
listener) {
+        maybeThrowInvalidGroupIdException();
+        throwIfSubscriptionPatternIsInvalid(pattern);
+        log.info("Subscribing to regular expression {}", pattern);
+
+        // TODO: generate event to update subscribed regex so it's included in 
the next HB.
+    }
+
+    private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern 
subscriptionPattern) {
+        if (subscriptionPattern == null) {
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be null");
+        }
+        if (subscriptionPattern.pattern().isEmpty()) {
+            throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be empty");
+        }
+    }
+
     private void subscribeInternal(Collection<String> topics, 
Optional<ConsumerRebalanceListener> listener) {
         acquireAndEnsureOpen();
         try {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index e423c261763..a3ac5e5698b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -35,6 +35,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -520,6 +521,18 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         subscribeInternal(pattern, Optional.empty());
     }
 
+    @Override
+    public void subscribe(SubscriptionPattern pattern, 
ConsumerRebalanceListener callback) {
+        throw new UnsupportedOperationException(String.format("Subscribe to 
RE2/J pattern is not supported when using" +
+            "the %s protocol defined in config %s", GroupProtocol.CLASSIC, 
ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+    }
+
+    @Override
+    public void subscribe(SubscriptionPattern pattern) {
+        throw new UnsupportedOperationException(String.format("Subscribe to 
RE2/J pattern is not supported when using" +
+            "the %s protocol defined in config %s", GroupProtocol.CLASSIC, 
ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+    }
+
     /**
      * Internal helper method for {@link #subscribe(Pattern)} and
      * {@link #subscribe(Pattern, ConsumerRebalanceListener)}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 7d122c2986c..2deaec2efea 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
 import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
@@ -3609,6 +3610,17 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
                 "Expected " + (groupProtocol == GroupProtocol.CLASSIC ? 
"JoinGroup" : "Heartbeat") + " request");
     }
 
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class, names = "CLASSIC")
+    public void 
testSubscribeToRe2jPatternNotSupportedForClassicConsumer(GroupProtocol 
groupProtocol) {
+        KafkaConsumer<String, String> consumer = 
newConsumerNoAutoCommit(groupProtocol, time, mock(NetworkClient.class), 
subscription,
+            mock(ConsumerMetadata.class));
+        assertThrows(UnsupportedOperationException.class, () ->
+            consumer.subscribe(new SubscriptionPattern("t*")));
+        assertThrows(UnsupportedOperationException.class, () ->
+            consumer.subscribe(new SubscriptionPattern("t*"), 
mock(ConsumerRebalanceListener.class)));
+    }
+
     private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
         return client.requests().stream().anyMatch(request -> 
request.requestBuilder().apiKey().equals(apiKey));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 8eb8ec4c85b..91e4cae0f98 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
@@ -1829,6 +1830,19 @@ public class AsyncKafkaConsumerTest {
         assertEquals(OffsetResetStrategy.LATEST, 
resetOffsetEvent.offsetResetStrategy());
     }
 
+    @Test
+    public void testSubscribeToRe2JPatternValidation() {
+        consumer = newConsumer();
+
+        Throwable t = assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe((SubscriptionPattern) null));
+        assertEquals("Topic pattern to subscribe to cannot be null", 
t.getMessage());
+
+        t = assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(new SubscriptionPattern("")));
+        assertEquals("Topic pattern to subscribe to cannot be empty", 
t.getMessage());
+
+        assertDoesNotThrow(() -> consumer.subscribe(new 
SubscriptionPattern("t*")));
+    }
+
     private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);

Reply via email to