This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb65dfeb116 KAFKA-17726: New consumer subscribe/subscribeFromPattern 
in background thread (#17569)
fb65dfeb116 is described below

commit fb65dfeb116f86386b1932fb26f188deae67c089
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Oct 31 05:15:13 2024 +0800

    KAFKA-17726: New consumer subscribe/subscribeFromPattern in background 
thread (#17569)
    
    Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  57 +++-------
 .../consumer/internals/SubscriptionState.java      |   4 +-
 .../internals/events/ApplicationEvent.java         |   3 +-
 .../events/ApplicationEventProcessor.java          |  93 ++++++++++++++--
 .../internals/events/SubscriptionChangeEvent.java  |  22 +++-
 .../TopicPatternSubscriptionChangeEvent.java       |  45 ++++++++
 .../events/TopicSubscriptionChangeEvent.java       |  44 ++++++++
 ...nt.java => UpdatePatternSubscriptionEvent.java} |  12 +--
 .../consumer/internals/AsyncKafkaConsumerTest.java |  79 +++++++-------
 .../events/ApplicationEventProcessorTest.java      | 120 ++++++++++++++++++++-
 10 files changed, 373 insertions(+), 106 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index dd652e3235e..d411aa4d770 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -58,10 +58,12 @@ import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
 import org.apache.kafka.common.Cluster;
@@ -235,7 +237,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private final SubscriptionState subscriptions;
     private final ConsumerMetadata metadata;
-    private int metadataVersionSnapshot;
     private final Metrics metrics;
     private final long retryBackoffMs;
     private final int defaultApiTimeoutMs;
@@ -314,7 +315,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.metadata = metadataFactory.build(config, subscriptions, 
logContext, clusterResourceListeners);
             final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             metadata.bootstrap(addresses);
-            this.metadataVersionSnapshot = metadata.updateVersion();
 
             FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
             FetchConfig fetchConfig = new FetchConfig(config);
@@ -440,7 +440,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.metrics = metrics;
         this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
         this.metadata = metadata;
-        this.metadataVersionSnapshot = metadata.updateVersion();
         this.retryBackoffMs = retryBackoffMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.deserializers = deserializers;
@@ -470,7 +469,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.time = time;
         this.metrics = new Metrics(time);
         this.metadata = metadata;
-        this.metadataVersionSnapshot = metadata.updateVersion();
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
@@ -1454,25 +1452,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    /**
-     * <p>
-     *
-     * This function evaluates the regex that the consumer subscribed to
-     * against the list of topic names from metadata, and updates
-     * the list of topics in subscription state accordingly
-     *
-     * @param cluster Cluster from which we get the topics
-     */
-    private void updatePatternSubscription(Cluster cluster) {
-        final Set<String> topicsToSubscribe = cluster.topics().stream()
-                .filter(subscriptions::matchesSubscribedPattern)
-                .collect(Collectors.toSet());
-        if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
-            applicationEventHandler.add(new SubscriptionChangeEvent());
-            this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
-        }
-    }
-
     @Override
     public void unsubscribe() {
         acquireAndEnsureOpen();
@@ -1679,7 +1658,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         maybeThrowFencedInstanceException();
         offsetCommitCallbackInvoker.executeCallbacks();
-        maybeUpdateSubscriptionMetadata();
+        try {
+            applicationEventHandler.addAndGet(new 
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
+        } catch (TimeoutException e) {
+            return false;
+        } finally {
+            timer.update();
+        }
         processBackgroundEvents();
 
         return updateFetchPositions(timer);
@@ -1758,9 +1743,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be " + (pattern == null ?
                     "null" : "empty"));
             log.info("Subscribed to pattern: '{}'", pattern);
-            subscriptions.subscribe(pattern, listener);
-            metadata.requestUpdateForNewTopics();
-            updatePatternSubscription(metadata.fetch());
+            applicationEventHandler.addAndGet(new 
TopicPatternSubscriptionChangeEvent(
+                pattern, listener, 
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
         } finally {
             release();
         }
@@ -1791,12 +1775,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
                 fetchBuffer.retainAll(currentTopicPartitions);
                 log.info("Subscribed to topic(s): {}", String.join(", ", 
topics));
-                if (subscriptions.subscribe(new HashSet<>(topics), listener))
-                    this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
-
-                // Trigger subscribe event to effectively join the group if 
not already part of it,
-                // or just send the new subscription to the broker.
-                applicationEventHandler.add(new SubscriptionChangeEvent());
+                applicationEventHandler.addAndGet(new 
TopicSubscriptionChangeEvent(
+                    new HashSet<>(topics), listener, 
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
             }
         } finally {
             release();
@@ -1977,13 +1957,4 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     SubscriptionState subscriptions() {
         return subscriptions;
     }
-
-    private void maybeUpdateSubscriptionMetadata() {
-        if (this.metadataVersionSnapshot < metadata.updateVersion()) {
-            this.metadataVersionSnapshot = metadata.updateVersion();
-            if (subscriptions.hasPatternSubscription()) {
-                updatePatternSubscription(metadata.fetch());
-            }
-        }
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index b7d90431166..310c7a3b8b1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -302,7 +302,7 @@ public class SubscriptionState {
      * Check whether pattern subscription is in use.
      *
      */
-    synchronized boolean hasPatternSubscription() {
+    public synchronized boolean hasPatternSubscription() {
         return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
     }
 
@@ -324,7 +324,7 @@ public class SubscriptionState {
      *
      * @return true if pattern subscription is in use and the topic matches 
the subscribed pattern, false otherwise
      */
-    synchronized boolean matchesSubscribedPattern(String topic) {
+    public synchronized boolean matchesSubscribedPattern(String topic) {
         Pattern pattern = this.subscribedPattern;
         if (hasPatternSubscription() && pattern != null)
             return pattern.matcher(topic).matches();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 0d258cda2b4..e11e702388c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -30,7 +30,8 @@ public abstract class ApplicationEvent {
 
     public enum Type {
         COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, 
TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
+        LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, 
TOPIC_METADATA, ALL_TOPICS_METADATA,
+        TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, 
UPDATE_SUBSCRIPTION_METADATA,
         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
         COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2f6ca35feaf..2879e471c7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicIdPartition;
@@ -38,6 +39,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
@@ -53,6 +55,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     private final ConsumerMetadata metadata;
     private final SubscriptionState subscriptions;
     private final RequestManagers requestManagers;
+    private int metadataVersionSnapshot;
 
     public ApplicationEventProcessor(final LogContext logContext,
                                      final RequestManagers requestManagers,
@@ -62,6 +65,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         this.requestManagers = requestManagers;
         this.metadata = metadata;
         this.subscriptions = subscriptions;
+        this.metadataVersionSnapshot = metadata.updateVersion();
     }
 
     @SuppressWarnings({"CyclomaticComplexity"})
@@ -108,8 +112,16 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((CheckAndUpdatePositionsEvent) event);
                 return;
 
-            case SUBSCRIPTION_CHANGE:
-                process((SubscriptionChangeEvent) event);
+            case TOPIC_SUBSCRIPTION_CHANGE:
+                process((TopicSubscriptionChangeEvent) event);
+                return;
+
+            case TOPIC_PATTERN_SUBSCRIPTION_CHANGE:
+                process((TopicPatternSubscriptionChangeEvent) event);
+                return;
+
+            case UPDATE_SUBSCRIPTION_METADATA:
+                process((UpdatePatternSubscriptionEvent) event);
                 return;
 
             case UNSUBSCRIBE:
@@ -248,16 +260,57 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     /**
-     * Process event that indicates that the subscription changed. This will 
make the
+     * Process event that indicates that the subscription topics changed. This 
will make the
      * consumer join the group if it is not part of it yet, or send the 
updated subscription if
-     * it is already a member.
+     * it is already a member on the next poll.
      */
-    private void process(final SubscriptionChangeEvent ignored) {
+    private void process(final TopicSubscriptionChangeEvent event) {
         if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
             log.warn("Group membership manager not present when processing a 
subscribe event");
+            event.future().complete(null);
             return;
         }
-        
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+
+        try {
+            if (subscriptions.subscribe(event.topics(), event.listener()))
+                this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
+
+            // Join the group if not already part of it, or just send the new 
subscription to the broker on the next poll.
+            
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Process event that indicates that the subscription topic pattern 
changed. This will make the
+     * consumer join the group if it is not part of it yet, or send the 
updated subscription if
+     * it is already a member on the next poll.
+     */
+    private void process(final TopicPatternSubscriptionChangeEvent event) {
+        try {
+            subscriptions.subscribe(event.pattern(), event.listener());
+            metadata.requestUpdateForNewTopics();
+            updatePatternSubscription(metadata.fetch());
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Process event that re-evaluates the subscribed regular expression using 
the latest topics from metadata, only if metadata changed.
+     * This will make the consumer send the updated subscription on the next 
poll.
+     */
+    private void process(final UpdatePatternSubscriptionEvent event) {
+        if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+            this.metadataVersionSnapshot = metadata.updateVersion();
+            if (subscriptions.hasPatternSubscription()) {
+                updatePatternSubscription(metadata.fetch());
+            }
+        }
+        event.future().complete(null);
     }
 
     /**
@@ -481,4 +534,32 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             event.future().completeExceptionally(e);
         }
     }
+
+    /**
+     * This function evaluates the regex that the consumer subscribed to
+     * against the list of topic names from metadata, and updates
+     * the list of topics in subscription state accordingly
+     *
+     * @param cluster Cluster from which we get the topics
+     */
+    private void updatePatternSubscription(Cluster cluster) {
+        if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
+            log.warn("Group membership manager not present when processing a 
subscribe event");
+            return;
+        }
+        final Set<String> topicsToSubscribe = cluster.topics().stream()
+            .filter(subscriptions::matchesSubscribedPattern)
+            .collect(Collectors.toSet());
+        if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
+            this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();
+
+            // Join the group if not already part of it, or just send the new 
subscription to the broker on the next poll.
+            
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+        }
+    }
+
+    // Visible for testing
+    int metadataVersionSnapshot() {
+        return metadataVersionSnapshot;
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
index ad5fd34c06f..bee9ed904d1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
@@ -17,14 +17,30 @@
 
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+
 /**
  * Application event indicating that the subscription state has changed, 
triggered when a user
  * calls the subscribe API. This will make the consumer join a consumer group 
if not part of it
  * yet, or just send the updated subscription to the broker if it's already a 
member of the group.
  */
-public class SubscriptionChangeEvent extends ApplicationEvent {
+public abstract class SubscriptionChangeEvent extends 
CompletableApplicationEvent<Void> {
+
+    private final Optional<ConsumerRebalanceListener> listener;
+
+    public SubscriptionChangeEvent(final Type type, final 
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+        super(type, deadlineMs);
+        this.listener = listener;
+    }
+
+    public Optional<ConsumerRebalanceListener> listener() {
+        return listener;
+    }
 
-    public SubscriptionChangeEvent() {
-        super(Type.SUBSCRIPTION_CHANGE);
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() + ", listener=" + listener;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
new file mode 100644
index 00000000000..cfdc0095284
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * Application event indicating that a user calls the subscribe API for a new 
pattern.
+ * This will make the consumer join a consumer group if not part of it yet,
+ * or just send the updated subscription to the broker on the next poll.
+ */
+public class TopicPatternSubscriptionChangeEvent extends 
SubscriptionChangeEvent {
+    private final Pattern pattern;
+
+    public TopicPatternSubscriptionChangeEvent(final Pattern pattern, final 
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+        super(Type.TOPIC_PATTERN_SUBSCRIPTION_CHANGE, listener, deadlineMs);
+        this.pattern = pattern;
+    }
+
+    public Pattern pattern() {
+        return pattern;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", pattern=" + pattern;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
new file mode 100644
index 00000000000..c8b9bbfac5a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Application event indicating that a user calls the subscribe API for new 
topics.
+ * This will make the consumer join a consumer group if not part of it yet,
+ * or just send the updated subscription to the broker on the next poll.
+ */
+public class TopicSubscriptionChangeEvent extends SubscriptionChangeEvent {
+    private final Set<String> topics;
+    public TopicSubscriptionChangeEvent(final Set<String> topics, final 
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+        super(Type.TOPIC_SUBSCRIPTION_CHANGE, listener, deadlineMs);
+        this.topics = topics;
+    }
+
+    public Set<String> topics() {
+        return topics;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", topics=" + topics;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
similarity index 66%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
index ad5fd34c06f..a6c5b5568f0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
@@ -14,17 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.clients.consumer.internals.events;
 
 /**
- * Application event indicating that the subscription state has changed, 
triggered when a user
- * calls the subscribe API. This will make the consumer join a consumer group 
if not part of it
- * yet, or just send the updated subscription to the broker if it's already a 
member of the group.
+ * Application event which is triggered as part of the consumer poll loop to 
update the pattern subscription
+ * if metadata changed.
  */
-public class SubscriptionChangeEvent extends ApplicationEvent {
+public class UpdatePatternSubscriptionEvent extends 
CompletableApplicationEvent<Void> {
 
-    public SubscriptionChangeEvent() {
-        super(Type.SUBSCRIPTION_CHANGE);
+    public UpdatePatternSubscriptionEvent(final long deadlineMs) {
+        super(Type.UPDATE_SUBSCRIPTION_METADATA, deadlineMs);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 7f78f6e8bba..069862cec68 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -48,10 +48,10 @@ import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
@@ -136,7 +136,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -492,48 +491,12 @@ public class AsyncKafkaConsumerTest {
             }
         };
 
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList(topicName), listener);
         consumer.poll(Duration.ZERO);
         assertTrue(callbackExecuted.get());
     }
 
-    @Test
-    public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
-        SubscriptionState subscriptions = mock(SubscriptionState.class);
-        Cluster cluster = mock(Cluster.class);
-
-        consumer = newConsumer(
-                mock(FetchBuffer.class),
-                mock(ConsumerInterceptors.class),
-                mock(ConsumerRebalanceListenerInvoker.class),
-                subscriptions,
-                "group-id",
-                "client-id");
-
-        final String topicName = "foo";
-        final int partition = 3;
-        final TopicPartition tp = new TopicPartition(topicName, partition);
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
-        doReturn(cluster).when(metadata).fetch();
-        doReturn(Collections.singleton(topicName)).when(cluster).topics();
-
-        consumer.subscribe(Pattern.compile("f*"));
-        verify(metadata).requestUpdateForNewTopics();
-        verify(subscriptions).matchesSubscribedPattern(topicName);
-        clearInvocations(subscriptions);
-
-        when(subscriptions.hasPatternSubscription()).thenReturn(true);
-        consumer.poll(Duration.ZERO);
-        verify(subscriptions, never()).matchesSubscribedPattern(topicName);
-
-        when(metadata.updateVersion()).thenReturn(2);
-        when(subscriptions.hasPatternSubscription()).thenReturn(true);
-        consumer.poll(Duration.ZERO);
-        verify(subscriptions).matchesSubscribedPattern(topicName);
-    }
-
     @Test
     public void testClearWakeupTriggerAfterPoll() {
         consumer = newConsumer();
@@ -882,6 +845,7 @@ public class AsyncKafkaConsumerTest {
             subscriptions,
             "group-id",
             "client-id");
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         completeSeekUnvalidatedEventSuccessfully();
@@ -900,6 +864,7 @@ public class AsyncKafkaConsumerTest {
             subscriptions,
             "group-id",
             "client-id");
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singleton("topic"), 
mock(ConsumerRebalanceListener.class));
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         completeSeekUnvalidatedEventSuccessfully();
@@ -1320,10 +1285,20 @@ public class AsyncKafkaConsumerTest {
     public void testSubscribeGeneratesEvent() {
         consumer = newConsumer();
         String topic = "topic1";
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
         assertTrue(consumer.assignment().isEmpty());
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeEvent.class));
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
+    }
+
+    @Test
+    public void testSubscribePatternGeneratesEvent() {
+        consumer = newConsumer();
+        Pattern pattern = Pattern.compile("topic.*");
+        completeTopicPatternSubscriptionChangeEventSuccessfully();
+        consumer.subscribe(pattern);
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
     }
 
     @Test
@@ -1520,6 +1495,7 @@ public class AsyncKafkaConsumerTest {
                 lostError
         );
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList("topic"), 
consumerRebalanceListener);
         SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
 
@@ -1709,6 +1685,7 @@ public class AsyncKafkaConsumerTest {
                 .collectFetch(Mockito.any(FetchBuffer.class));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
 
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singletonList("topic1"));
         consumer.poll(Duration.ofMillis(100));
         verify(applicationEventHandler).add(any(PollEvent.class));
@@ -1739,6 +1716,7 @@ public class AsyncKafkaConsumerTest {
     public void testLongPollWaitIsLimited() {
         consumer = newConsumer();
         String topicName = "topic1";
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(singletonList(topicName));
 
         assertEquals(singleton(topicName), consumer.subscription());
@@ -1893,6 +1871,7 @@ public class AsyncKafkaConsumerTest {
     void testReaperInvokedInPoll() {
         consumer = newConsumer();
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        completeTopicSubscriptionChangeEventSuccessfully();
         consumer.subscribe(Collections.singletonList("topic"));
         
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
         consumer.poll(Duration.ZERO);
@@ -2056,6 +2035,24 @@ public class AsyncKafkaConsumerTest {
         
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
     }
 
+    private void completeTopicSubscriptionChangeEventSuccessfully() {
+        doAnswer(invocation -> {
+            TopicSubscriptionChangeEvent event = invocation.getArgument(0);
+            consumer.subscriptions().subscribe(event.topics(), 
event.listener());
+            event.future().complete(null);
+            return null;
+        
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
+    }
+
+    private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
+        doAnswer(invocation -> {
+            TopicPatternSubscriptionChangeEvent event = 
invocation.getArgument(0);
+            consumer.subscriptions().subscribe(event.pattern(), 
event.listener());
+            event.future().complete(null);
+            return null;
+        
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
+    }
+
     private void completeSeekUnvalidatedEventSuccessfully() {
         doAnswer(invocation -> {
             SeekUnvalidatedEvent event = invocation.getArgument(0);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 27f3ae46002..f1c05d2e6b9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
@@ -24,11 +25,13 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
 import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -46,12 +49,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
@@ -62,6 +68,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -71,7 +78,7 @@ public class ApplicationEventProcessorTest {
     private final ConsumerHeartbeatRequestManager heartbeatRequestManager = 
mock(ConsumerHeartbeatRequestManager.class);
     private final ConsumerMembershipManager membershipManager = 
mock(ConsumerMembershipManager.class);
     private final OffsetsRequestManager offsetsRequestManager = 
mock(OffsetsRequestManager.class);
-    private final SubscriptionState subscriptionState = 
mock(SubscriptionState.class);
+    private SubscriptionState subscriptionState = 
mock(SubscriptionState.class);
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
     private ApplicationEventProcessor processor;
 
@@ -235,15 +242,122 @@ public class ApplicationEventProcessorTest {
     }
 
     @Test
-    public void testSubscriptionChangeEvent() {
-        SubscriptionChangeEvent event = new SubscriptionChangeEvent();
+    public void testTopicSubscriptionChangeEvent() {
+        Set<String> topics = Set.of("topic1", "topic2");
+        Optional<ConsumerRebalanceListener> listener = Optional.of(new 
MockRebalanceListener());
+        TopicSubscriptionChangeEvent event = new 
TopicSubscriptionChangeEvent(topics, listener, 12345);
 
         setupProcessor(true);
+        when(subscriptionState.subscribe(topics, listener)).thenReturn(true);
+        when(metadata.requestUpdateForNewTopics()).thenReturn(1);
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
         processor.process(event);
+
+        verify(subscriptionState).subscribe(topics, listener);
+        verify(metadata).requestUpdateForNewTopics();
+        assertEquals(1, processor.metadataVersionSnapshot());
+        verify(membershipManager).onSubscriptionUpdated();
+        // verify member state doesn't transition to JOINING.
+        verify(membershipManager, never()).onConsumerPoll();
+        assertDoesNotThrow(() -> event.future().get());
+    }
+
+    @Test
+    public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState() 
{
+        subscriptionState = new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
+        Optional<ConsumerRebalanceListener> listener = Optional.of(new 
MockRebalanceListener());
+        TopicSubscriptionChangeEvent event = new TopicSubscriptionChangeEvent(
+            Set.of("topic1", "topic2"), listener, 12345);
+
+        subscriptionState.subscribe(Pattern.compile("topic.*"), listener);
+        setupProcessor(true);
+        when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        processor.process(event);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
event.future().get());
+        assertInstanceOf(IllegalStateException.class, e.getCause());
+        assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getCause().getMessage());
+    }
+
+    @Test
+    public void testTopicPatternSubscriptionChangeEvent() {
+        Pattern pattern = Pattern.compile("topic.*");
+        Set<String> topics = Set.of("topic.1", "topic.2");
+        Optional<ConsumerRebalanceListener> listener = Optional.of(new 
MockRebalanceListener());
+        TopicPatternSubscriptionChangeEvent event = new 
TopicPatternSubscriptionChangeEvent(pattern, listener, 12345);
+
+        setupProcessor(true);
+
+        Cluster cluster = mock(Cluster.class);
+        when(metadata.fetch()).thenReturn(cluster);
+        when(cluster.topics()).thenReturn(topics);
+        
when(subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn(true);
+        when(subscriptionState.subscribeFromPattern(topics)).thenReturn(true);
+        when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        processor.process(event);
+
+        verify(subscriptionState).subscribe(pattern, listener);
+        verify(subscriptionState).subscribeFromPattern(topics);
+        verify(metadata, times(2)).requestUpdateForNewTopics();
+        assertEquals(1, processor.metadataVersionSnapshot());
         verify(membershipManager).onSubscriptionUpdated();
         // verify member state doesn't transition to JOINING.
         verify(membershipManager, never()).onConsumerPoll();
+        assertDoesNotThrow(() -> event.future().get());
+    }
+
+    @Test
+    public void 
testTopicPatternSubscriptionChangeEventWithIllegalSubscriptionState() {
+        subscriptionState = new SubscriptionState(new LogContext(), 
OffsetResetStrategy.EARLIEST);
+        Optional<ConsumerRebalanceListener> listener = Optional.of(new 
MockRebalanceListener());
+        TopicPatternSubscriptionChangeEvent event = new 
TopicPatternSubscriptionChangeEvent(
+            Pattern.compile("topic.*"), listener, 12345);
+
+        setupProcessor(true);
+
+        subscriptionState.subscribe(Set.of("topic.1", "topic.2"), listener);
+        processor.process(event);
+
+        ExecutionException e = assertThrows(ExecutionException.class, () -> 
event.future().get());
+        assertInstanceOf(IllegalStateException.class, e.getCause());
+        assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getCause().getMessage());
+    }
+
+    @Test
+    public void 
testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewVersion() {
+        UpdatePatternSubscriptionEvent event1 = new 
UpdatePatternSubscriptionEvent(12345);
+
+        setupProcessor(true);
+
+        when(metadata.updateVersion()).thenReturn(0);
+
+        processor.process(event1);
+        verify(subscriptionState, never()).hasPatternSubscription();
+        assertDoesNotThrow(() -> event1.future().get());
+
+        Cluster cluster = mock(Cluster.class);
+        Set<String> topics = Set.of("topic.1", "topic.2");
+        when(metadata.updateVersion()).thenReturn(1);
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+        when(metadata.fetch()).thenReturn(cluster);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        when(cluster.topics()).thenReturn(topics);
+        
when(subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn(true);
+        
when(subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn(true);
+        when(subscriptionState.subscribeFromPattern(topics)).thenReturn(true);
+        when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+
+        UpdatePatternSubscriptionEvent event2 = new 
UpdatePatternSubscriptionEvent(12345);
+        processor.process(event2);
+        verify(metadata).requestUpdateForNewTopics();
+        verify(subscriptionState).hasPatternSubscription();
+        verify(subscriptionState).subscribeFromPattern(topics);
+        assertEquals(1, processor.metadataVersionSnapshot());
+        verify(membershipManager).onSubscriptionUpdated();
+        assertDoesNotThrow(() -> event2.future().get());
     }
 
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {


Reply via email to