This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fb65dfeb116 KAFKA-17726: New consumer subscribe/subscribeFromPattern
in background thread (#17569)
fb65dfeb116 is described below
commit fb65dfeb116f86386b1932fb26f188deae67c089
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Oct 31 05:15:13 2024 +0800
KAFKA-17726: New consumer subscribe/subscribeFromPattern in background
thread (#17569)
Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 57 +++-------
.../consumer/internals/SubscriptionState.java | 4 +-
.../internals/events/ApplicationEvent.java | 3 +-
.../events/ApplicationEventProcessor.java | 93 ++++++++++++++--
.../internals/events/SubscriptionChangeEvent.java | 22 +++-
.../TopicPatternSubscriptionChangeEvent.java | 45 ++++++++
.../events/TopicSubscriptionChangeEvent.java | 44 ++++++++
...nt.java => UpdatePatternSubscriptionEvent.java} | 12 +--
.../consumer/internals/AsyncKafkaConsumerTest.java | 79 +++++++-------
.../events/ApplicationEventProcessorTest.java | 120 ++++++++++++++++++++-
10 files changed, 373 insertions(+), 106 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index dd652e3235e..d411aa4d770 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -58,10 +58,12 @@ import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
@@ -235,7 +237,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final SubscriptionState subscriptions;
private final ConsumerMetadata metadata;
- private int metadataVersionSnapshot;
private final Metrics metrics;
private final long retryBackoffMs;
private final int defaultApiTimeoutMs;
@@ -314,7 +315,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metadata = metadataFactory.build(config, subscriptions,
logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
- this.metadataVersionSnapshot = metadata.updateVersion();
FetchMetricsManager fetchMetricsManager =
createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
@@ -440,7 +440,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metrics = metrics;
this.groupMetadata.set(initializeGroupMetadata(groupId,
Optional.empty()));
this.metadata = metadata;
- this.metadataVersionSnapshot = metadata.updateVersion();
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
@@ -470,7 +469,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.time = time;
this.metrics = new Metrics(time);
this.metadata = metadata;
- this.metadataVersionSnapshot = metadata.updateVersion();
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
@@ -1454,25 +1452,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- /**
- * <p>
- *
- * This function evaluates the regex that the consumer subscribed to
- * against the list of topic names from metadata, and updates
- * the list of topics in subscription state accordingly
- *
- * @param cluster Cluster from which we get the topics
- */
- private void updatePatternSubscription(Cluster cluster) {
- final Set<String> topicsToSubscribe = cluster.topics().stream()
- .filter(subscriptions::matchesSubscribedPattern)
- .collect(Collectors.toSet());
- if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
- applicationEventHandler.add(new SubscriptionChangeEvent());
- this.metadataVersionSnapshot =
metadata.requestUpdateForNewTopics();
- }
- }
-
@Override
public void unsubscribe() {
acquireAndEnsureOpen();
@@ -1679,7 +1658,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
- maybeUpdateSubscriptionMetadata();
+ try {
+ applicationEventHandler.addAndGet(new
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
+ } catch (TimeoutException e) {
+ return false;
+ } finally {
+ timer.update();
+ }
processBackgroundEvents();
return updateFetchPositions(timer);
@@ -1758,9 +1743,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw new IllegalArgumentException("Topic pattern to subscribe
to cannot be " + (pattern == null ?
"null" : "empty"));
log.info("Subscribed to pattern: '{}'", pattern);
- subscriptions.subscribe(pattern, listener);
- metadata.requestUpdateForNewTopics();
- updatePatternSubscription(metadata.fetch());
+ applicationEventHandler.addAndGet(new
TopicPatternSubscriptionChangeEvent(
+ pattern, listener,
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
} finally {
release();
}
@@ -1791,12 +1775,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetchBuffer.retainAll(currentTopicPartitions);
log.info("Subscribed to topic(s): {}", String.join(", ",
topics));
- if (subscriptions.subscribe(new HashSet<>(topics), listener))
- this.metadataVersionSnapshot =
metadata.requestUpdateForNewTopics();
-
- // Trigger subscribe event to effectively join the group if
not already part of it,
- // or just send the new subscription to the broker.
- applicationEventHandler.add(new SubscriptionChangeEvent());
+ applicationEventHandler.addAndGet(new
TopicSubscriptionChangeEvent(
+ new HashSet<>(topics), listener,
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
}
} finally {
release();
@@ -1977,13 +1957,4 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
SubscriptionState subscriptions() {
return subscriptions;
}
-
- private void maybeUpdateSubscriptionMetadata() {
- if (this.metadataVersionSnapshot < metadata.updateVersion()) {
- this.metadataVersionSnapshot = metadata.updateVersion();
- if (subscriptions.hasPatternSubscription()) {
- updatePatternSubscription(metadata.fetch());
- }
- }
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index b7d90431166..310c7a3b8b1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -302,7 +302,7 @@ public class SubscriptionState {
* Check whether pattern subscription is in use.
*
*/
- synchronized boolean hasPatternSubscription() {
+ public synchronized boolean hasPatternSubscription() {
return this.subscriptionType == SubscriptionType.AUTO_PATTERN;
}
@@ -324,7 +324,7 @@ public class SubscriptionState {
*
* @return true if pattern subscription is in use and the topic matches
the subscribed pattern, false otherwise
*/
- synchronized boolean matchesSubscribedPattern(String topic) {
+ public synchronized boolean matchesSubscribedPattern(String topic) {
Pattern pattern = this.subscribedPattern;
if (hasPatternSubscription() && pattern != null)
return pattern.matcher(topic).matches();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 0d258cda2b4..e11e702388c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -30,7 +30,8 @@ public abstract class ApplicationEvent {
public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS,
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
- LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET,
TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
+ LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET,
TOPIC_METADATA, ALL_TOPICS_METADATA,
+ TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 2f6ca35feaf..2879e471c7a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicIdPartition;
@@ -38,6 +39,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@@ -53,6 +55,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
+ private int metadataVersionSnapshot;
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
@@ -62,6 +65,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
this.requestManagers = requestManagers;
this.metadata = metadata;
this.subscriptions = subscriptions;
+ this.metadataVersionSnapshot = metadata.updateVersion();
}
@SuppressWarnings({"CyclomaticComplexity"})
@@ -108,8 +112,16 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((CheckAndUpdatePositionsEvent) event);
return;
- case SUBSCRIPTION_CHANGE:
- process((SubscriptionChangeEvent) event);
+ case TOPIC_SUBSCRIPTION_CHANGE:
+ process((TopicSubscriptionChangeEvent) event);
+ return;
+
+ case TOPIC_PATTERN_SUBSCRIPTION_CHANGE:
+ process((TopicPatternSubscriptionChangeEvent) event);
+ return;
+
+ case UPDATE_SUBSCRIPTION_METADATA:
+ process((UpdatePatternSubscriptionEvent) event);
return;
case UNSUBSCRIBE:
@@ -248,16 +260,57 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
/**
- * Process event that indicates that the subscription changed. This will
make the
+ * Process event that indicates that the subscription topics changed. This
will make the
* consumer join the group if it is not part of it yet, or send the
updated subscription if
- * it is already a member.
+ * it is already a member on the next poll.
*/
- private void process(final SubscriptionChangeEvent ignored) {
+ private void process(final TopicSubscriptionChangeEvent event) {
if (!requestManagers.consumerHeartbeatRequestManager.isPresent()) {
log.warn("Group membership manager not present when processing a
subscribe event");
+ event.future().complete(null);
return;
}
-
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+
+ try {
+ if (subscriptions.subscribe(event.topics(), event.listener()))
+ this.metadataVersionSnapshot =
metadata.requestUpdateForNewTopics();
+
+ // Join the group if not already part of it, or just send the new
subscription to the broker on the next poll.
+
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Process event that indicates that the subscription topic pattern
changed. This will make the
+ * consumer join the group if it is not part of it yet, or send the
updated subscription if
+ * it is already a member on the next poll.
+ */
+ private void process(final TopicPatternSubscriptionChangeEvent event) {
+ try {
+ subscriptions.subscribe(event.pattern(), event.listener());
+ metadata.requestUpdateForNewTopics();
+ updatePatternSubscription(metadata.fetch());
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ /**
+ * Process event that re-evaluates the subscribed regular expression using
the latest topics from metadata, only if metadata changed.
+ * This will make the consumer send the updated subscription on the next
poll.
+ */
+ private void process(final UpdatePatternSubscriptionEvent event) {
+ if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+ this.metadataVersionSnapshot = metadata.updateVersion();
+ if (subscriptions.hasPatternSubscription()) {
+ updatePatternSubscription(metadata.fetch());
+ }
+ }
+ event.future().complete(null);
}
/**
@@ -481,4 +534,32 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
event.future().completeExceptionally(e);
}
}
+
+ /**
+ * This function evaluates the regex that the consumer subscribed to
+ * against the list of topic names from metadata, and updates
+ * the list of topics in subscription state accordingly
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+ private void updatePatternSubscription(Cluster cluster) {
+ if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
+ log.warn("Group membership manager not present when processing a
subscribe event");
+ return;
+ }
+ final Set<String> topicsToSubscribe = cluster.topics().stream()
+ .filter(subscriptions::matchesSubscribedPattern)
+ .collect(Collectors.toSet());
+ if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
+ this.metadataVersionSnapshot =
metadata.requestUpdateForNewTopics();
+
+ // Join the group if not already part of it, or just send the new
subscription to the broker on the next poll.
+
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
+ }
+ }
+
+ // Visible for testing
+ int metadataVersionSnapshot() {
+ return metadataVersionSnapshot;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
index ad5fd34c06f..bee9ed904d1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
@@ -17,14 +17,30 @@
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+
/**
* Application event indicating that the subscription state has changed,
triggered when a user
* calls the subscribe API. This will make the consumer join a consumer group
if not part of it
* yet, or just send the updated subscription to the broker if it's already a
member of the group.
*/
-public class SubscriptionChangeEvent extends ApplicationEvent {
+public abstract class SubscriptionChangeEvent extends
CompletableApplicationEvent<Void> {
+
+ private final Optional<ConsumerRebalanceListener> listener;
+
+ public SubscriptionChangeEvent(final Type type, final
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+ super(type, deadlineMs);
+ this.listener = listener;
+ }
+
+ public Optional<ConsumerRebalanceListener> listener() {
+ return listener;
+ }
- public SubscriptionChangeEvent() {
- super(Type.SUBSCRIPTION_CHANGE);
+ @Override
+ protected String toStringBase() {
+ return super.toStringBase() + ", listener=" + listener;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
new file mode 100644
index 00000000000..cfdc0095284
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicPatternSubscriptionChangeEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * Application event indicating that a user calls the subscribe API for a new
pattern.
+ * This will make the consumer join a consumer group if not part of it yet,
+ * or just send the updated subscription to the broker on the next poll.
+ */
+public class TopicPatternSubscriptionChangeEvent extends
SubscriptionChangeEvent {
+ private final Pattern pattern;
+
+ public TopicPatternSubscriptionChangeEvent(final Pattern pattern, final
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+ super(Type.TOPIC_PATTERN_SUBSCRIPTION_CHANGE, listener, deadlineMs);
+ this.pattern = pattern;
+ }
+
+ public Pattern pattern() {
+ return pattern;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", pattern=" + pattern;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
new file mode 100644
index 00000000000..c8b9bbfac5a
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicSubscriptionChangeEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Application event indicating that a user calls the subscribe API for new
topics.
+ * This will make the consumer join a consumer group if not part of it yet,
+ * or just send the updated subscription to the broker on the next poll.
+ */
+public class TopicSubscriptionChangeEvent extends SubscriptionChangeEvent {
+ private final Set<String> topics;
+ public TopicSubscriptionChangeEvent(final Set<String> topics, final
Optional<ConsumerRebalanceListener> listener, final long deadlineMs) {
+ super(Type.TOPIC_SUBSCRIPTION_CHANGE, listener, deadlineMs);
+ this.topics = topics;
+ }
+
+ public Set<String> topics() {
+ return topics;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", topics=" + topics;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
similarity index 66%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
index ad5fd34c06f..a6c5b5568f0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UpdatePatternSubscriptionEvent.java
@@ -14,17 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.clients.consumer.internals.events;
/**
- * Application event indicating that the subscription state has changed,
triggered when a user
- * calls the subscribe API. This will make the consumer join a consumer group
if not part of it
- * yet, or just send the updated subscription to the broker if it's already a
member of the group.
+ * Application event which is triggered as part of the consumer poll loop to
update the pattern subscription
+ * if metadata changed.
*/
-public class SubscriptionChangeEvent extends ApplicationEvent {
+public class UpdatePatternSubscriptionEvent extends
CompletableApplicationEvent<Void> {
- public SubscriptionChangeEvent() {
- super(Type.SUBSCRIPTION_CHANGE);
+ public UpdatePatternSubscriptionEvent(final long deadlineMs) {
+ super(Type.UPDATE_SUBSCRIPTION_METADATA, deadlineMs);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 7f78f6e8bba..069862cec68 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -48,10 +48,10 @@ import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
@@ -136,7 +136,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@@ -492,48 +491,12 @@ public class AsyncKafkaConsumerTest {
}
};
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get());
}
- @Test
- public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
- SubscriptionState subscriptions = mock(SubscriptionState.class);
- Cluster cluster = mock(Cluster.class);
-
- consumer = newConsumer(
- mock(FetchBuffer.class),
- mock(ConsumerInterceptors.class),
- mock(ConsumerRebalanceListenerInvoker.class),
- subscriptions,
- "group-id",
- "client-id");
-
- final String topicName = "foo";
- final int partition = 3;
- final TopicPartition tp = new TopicPartition(topicName, partition);
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
-
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
- doReturn(cluster).when(metadata).fetch();
- doReturn(Collections.singleton(topicName)).when(cluster).topics();
-
- consumer.subscribe(Pattern.compile("f*"));
- verify(metadata).requestUpdateForNewTopics();
- verify(subscriptions).matchesSubscribedPattern(topicName);
- clearInvocations(subscriptions);
-
- when(subscriptions.hasPatternSubscription()).thenReturn(true);
- consumer.poll(Duration.ZERO);
- verify(subscriptions, never()).matchesSubscribedPattern(topicName);
-
- when(metadata.updateVersion()).thenReturn(2);
- when(subscriptions.hasPatternSubscription()).thenReturn(true);
- consumer.poll(Duration.ZERO);
- verify(subscriptions).matchesSubscribedPattern(topicName);
- }
-
@Test
public void testClearWakeupTriggerAfterPoll() {
consumer = newConsumer();
@@ -882,6 +845,7 @@ public class AsyncKafkaConsumerTest {
subscriptions,
"group-id",
"client-id");
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
@@ -900,6 +864,7 @@ public class AsyncKafkaConsumerTest {
subscriptions,
"group-id",
"client-id");
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singleton("topic"),
mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
completeSeekUnvalidatedEventSuccessfully();
@@ -1320,10 +1285,20 @@ public class AsyncKafkaConsumerTest {
public void testSubscribeGeneratesEvent() {
consumer = newConsumer();
String topic = "topic1";
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList(topic));
assertEquals(singleton(topic), consumer.subscription());
assertTrue(consumer.assignment().isEmpty());
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeEvent.class));
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
+ }
+
+ @Test
+ public void testSubscribePatternGeneratesEvent() {
+ consumer = newConsumer();
+ Pattern pattern = Pattern.compile("topic.*");
+ completeTopicPatternSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(pattern);
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
}
@Test
@@ -1520,6 +1495,7 @@ public class AsyncKafkaConsumerTest {
lostError
);
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"),
consumerRebalanceListener);
SortedSet<TopicPartition> partitions = Collections.emptySortedSet();
@@ -1709,6 +1685,7 @@ public class AsyncKafkaConsumerTest {
.collectFetch(Mockito.any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
@@ -1739,6 +1716,7 @@ public class AsyncKafkaConsumerTest {
public void testLongPollWaitIsLimited() {
consumer = newConsumer();
String topicName = "topic1";
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList(topicName));
assertEquals(singleton(topicName), consumer.subscription());
@@ -1893,6 +1871,7 @@ public class AsyncKafkaConsumerTest {
void testReaperInvokedInPoll() {
consumer = newConsumer();
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+ completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
consumer.poll(Duration.ZERO);
@@ -2056,6 +2035,24 @@ public class AsyncKafkaConsumerTest {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
}
+ private void completeTopicSubscriptionChangeEventSuccessfully() {
+ doAnswer(invocation -> {
+ TopicSubscriptionChangeEvent event = invocation.getArgument(0);
+ consumer.subscriptions().subscribe(event.topics(),
event.listener());
+ event.future().complete(null);
+ return null;
+
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
+ }
+
+ private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
+ doAnswer(invocation -> {
+ TopicPatternSubscriptionChangeEvent event =
invocation.getArgument(0);
+ consumer.subscriptions().subscribe(event.pattern(),
event.listener());
+ event.future().complete(null);
+ return null;
+
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
+ }
+
private void completeSeekUnvalidatedEventSuccessfully() {
doAnswer(invocation -> {
SeekUnvalidatedEvent event = invocation.getArgument(0);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 27f3ae46002..f1c05d2e6b9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
@@ -24,11 +25,13 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
+import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -46,12 +49,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
@@ -62,6 +68,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -71,7 +78,7 @@ public class ApplicationEventProcessorTest {
private final ConsumerHeartbeatRequestManager heartbeatRequestManager =
mock(ConsumerHeartbeatRequestManager.class);
private final ConsumerMembershipManager membershipManager =
mock(ConsumerMembershipManager.class);
private final OffsetsRequestManager offsetsRequestManager =
mock(OffsetsRequestManager.class);
- private final SubscriptionState subscriptionState =
mock(SubscriptionState.class);
+ private SubscriptionState subscriptionState =
mock(SubscriptionState.class);
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
private ApplicationEventProcessor processor;
@@ -235,15 +242,122 @@ public class ApplicationEventProcessorTest {
}
@Test
- public void testSubscriptionChangeEvent() {
- SubscriptionChangeEvent event = new SubscriptionChangeEvent();
+ public void testTopicSubscriptionChangeEvent() {
+ Set<String> topics = Set.of("topic1", "topic2");
+ Optional<ConsumerRebalanceListener> listener = Optional.of(new
MockRebalanceListener());
+ TopicSubscriptionChangeEvent event = new
TopicSubscriptionChangeEvent(topics, listener, 12345);
setupProcessor(true);
+ when(subscriptionState.subscribe(topics, listener)).thenReturn(true);
+ when(metadata.requestUpdateForNewTopics()).thenReturn(1);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
processor.process(event);
+
+ verify(subscriptionState).subscribe(topics, listener);
+ verify(metadata).requestUpdateForNewTopics();
+ assertEquals(1, processor.metadataVersionSnapshot());
+ verify(membershipManager).onSubscriptionUpdated();
+ // verify member state doesn't transition to JOINING.
+ verify(membershipManager, never()).onConsumerPoll();
+ assertDoesNotThrow(() -> event.future().get());
+ }
+
+ @Test
+ public void testTopicSubscriptionChangeEventWithIllegalSubscriptionState()
{
+ subscriptionState = new SubscriptionState(new LogContext(),
OffsetResetStrategy.EARLIEST);
+ Optional<ConsumerRebalanceListener> listener = Optional.of(new
MockRebalanceListener());
+ TopicSubscriptionChangeEvent event = new TopicSubscriptionChangeEvent(
+ Set.of("topic1", "topic2"), listener, 12345);
+
+ subscriptionState.subscribe(Pattern.compile("topic.*"), listener);
+ setupProcessor(true);
+ when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+ processor.process(event);
+
+ ExecutionException e = assertThrows(ExecutionException.class, () ->
event.future().get());
+ assertInstanceOf(IllegalStateException.class, e.getCause());
+ assertEquals("Subscription to topics, partitions and pattern are
mutually exclusive", e.getCause().getMessage());
+ }
+
+ @Test
+ public void testTopicPatternSubscriptionChangeEvent() {
+ Pattern pattern = Pattern.compile("topic.*");
+ Set<String> topics = Set.of("topic.1", "topic.2");
+ Optional<ConsumerRebalanceListener> listener = Optional.of(new
MockRebalanceListener());
+ TopicPatternSubscriptionChangeEvent event = new
TopicPatternSubscriptionChangeEvent(pattern, listener, 12345);
+
+ setupProcessor(true);
+
+ Cluster cluster = mock(Cluster.class);
+ when(metadata.fetch()).thenReturn(cluster);
+ when(cluster.topics()).thenReturn(topics);
+
when(subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn(true);
+ when(subscriptionState.subscribeFromPattern(topics)).thenReturn(true);
+ when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+ processor.process(event);
+
+ verify(subscriptionState).subscribe(pattern, listener);
+ verify(subscriptionState).subscribeFromPattern(topics);
+ verify(metadata, times(2)).requestUpdateForNewTopics();
+ assertEquals(1, processor.metadataVersionSnapshot());
verify(membershipManager).onSubscriptionUpdated();
// verify member state doesn't transition to JOINING.
verify(membershipManager, never()).onConsumerPoll();
+ assertDoesNotThrow(() -> event.future().get());
+ }
+
+ @Test
+ public void
testTopicPatternSubscriptionChangeEventWithIllegalSubscriptionState() {
+ subscriptionState = new SubscriptionState(new LogContext(),
OffsetResetStrategy.EARLIEST);
+ Optional<ConsumerRebalanceListener> listener = Optional.of(new
MockRebalanceListener());
+ TopicPatternSubscriptionChangeEvent event = new
TopicPatternSubscriptionChangeEvent(
+ Pattern.compile("topic.*"), listener, 12345);
+
+ setupProcessor(true);
+
+ subscriptionState.subscribe(Set.of("topic.1", "topic.2"), listener);
+ processor.process(event);
+
+ ExecutionException e = assertThrows(ExecutionException.class, () ->
event.future().get());
+ assertInstanceOf(IllegalStateException.class, e.getCause());
+ assertEquals("Subscription to topics, partitions and pattern are
mutually exclusive", e.getCause().getMessage());
+ }
+
+ @Test
+ public void
testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewVersion() {
+ UpdatePatternSubscriptionEvent event1 = new
UpdatePatternSubscriptionEvent(12345);
+
+ setupProcessor(true);
+
+ when(metadata.updateVersion()).thenReturn(0);
+
+ processor.process(event1);
+ verify(subscriptionState, never()).hasPatternSubscription();
+ assertDoesNotThrow(() -> event1.future().get());
+
+ Cluster cluster = mock(Cluster.class);
+ Set<String> topics = Set.of("topic.1", "topic.2");
+ when(metadata.updateVersion()).thenReturn(1);
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
+ when(metadata.fetch()).thenReturn(cluster);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+ when(cluster.topics()).thenReturn(topics);
+
when(subscriptionState.matchesSubscribedPattern("topic.1")).thenReturn(true);
+
when(subscriptionState.matchesSubscribedPattern("topic.2")).thenReturn(true);
+ when(subscriptionState.subscribeFromPattern(topics)).thenReturn(true);
+ when(metadata.requestUpdateForNewTopics()).thenReturn(1);
+
+ UpdatePatternSubscriptionEvent event2 = new
UpdatePatternSubscriptionEvent(12345);
+ processor.process(event2);
+ verify(metadata).requestUpdateForNewTopics();
+ verify(subscriptionState).hasPatternSubscription();
+ verify(subscriptionState).subscribeFromPattern(topics);
+ assertEquals(1, processor.metadataVersionSnapshot());
+ verify(membershipManager).onSubscriptionUpdated();
+ assertDoesNotThrow(() -> event2.future().get());
}
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {