This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6df192b6cb1 KAFKA-15281: Implement the groupMetadata Consumer API
(#14879)
6df192b6cb1 is described below
commit 6df192b6cb1397a6e6173835bbbd8a3acb7e3988
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Dec 6 08:46:57 2023 +0100
KAFKA-15281: Implement the groupMetadata Consumer API (#14879)
Implements the groupMetadata() API on the async consumer.
Reviewers: Kirk True <[email protected]>, Andrew Schofield
<[email protected]>, Lucas Brutschy <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 94 ++++++++++++-
.../internals/HeartbeatRequestManager.java | 19 +++
.../consumer/internals/events/BackgroundEvent.java | 3 +-
.../internals/events/BackgroundEventProcessor.java | 80 -----------
.../internals/events/GroupMetadataUpdateEvent.java | 79 +++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 148 ++++++++++++++++++++-
.../consumer/internals/ConsumerTestBuilder.java | 4 -
.../internals/HeartbeatRequestManagerTest.java | 122 ++++++++++++++++-
.../events/BackgroundEventHandlerTest.java | 141 --------------------
9 files changed, 452 insertions(+), 238 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 7733c45bc4a..6c67f0bffdc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -41,8 +41,10 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
+import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
@@ -134,6 +136,77 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private static final long NO_CURRENT_THREAD = -1L;
+ /**
+ * An {@link
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is
created and executes in the
+ * application thread for the purpose of processing {@link BackgroundEvent
background events} generated by the
+ * {@link ConsumerNetworkThread network thread}.
+ * Those events are generally of two types:
+ *
+ * <ul>
+ * <li>Errors that occur in the network thread that need to be
propagated to the application thread</li>
+ * <li>{@link ConsumerRebalanceListener} callbacks that are to be
executed on the application thread</li>
+ * </ul>
+ */
+ public class BackgroundEventProcessor extends
EventProcessor<BackgroundEvent> {
+
+ public BackgroundEventProcessor(final LogContext logContext,
+ final BlockingQueue<BackgroundEvent>
backgroundEventQueue) {
+ super(logContext, backgroundEventQueue);
+ }
+
+ /**
+ * Process the events—if any—that were produced by the {@link
ConsumerNetworkThread network thread}.
+ * It is possible that {@link
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an
error}
+ * could occur when processing the events. In such cases, the
processor will take a reference to the first
+ * error, continue to process the remaining events, and then throw the
first error that occurred.
+ */
+ @Override
+ public void process() {
+ AtomicReference<RuntimeException> firstError = new
AtomicReference<>();
+ process((event, error) -> firstError.compareAndSet(null, error));
+
+ if (firstError.get() != null) {
+ throw firstError.get();
+ }
+ }
+
+ @Override
+ public void process(final BackgroundEvent event) {
+ switch (event.type()) {
+ case ERROR:
+ process((ErrorBackgroundEvent) event);
+ break;
+ case GROUP_METADATA_UPDATE:
+ process((GroupMetadataUpdateEvent) event);
+ break;
+ default:
+ throw new IllegalArgumentException("Background event type
" + event.type() + " was not expected");
+
+ }
+ }
+
+ @Override
+ protected Class<BackgroundEvent> getEventClass() {
+ return BackgroundEvent.class;
+ }
+
+ private void process(final ErrorBackgroundEvent event) {
+ throw event.error();
+ }
+
+ private void process(final GroupMetadataUpdateEvent event) {
+ if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+ final ConsumerGroupMetadata currentGroupMetadata =
AsyncKafkaConsumer.this.groupMetadata.get();
+ AsyncKafkaConsumer.this.groupMetadata = Optional.of(new
ConsumerGroupMetadata(
+ currentGroupMetadata.groupId(),
+ event.memberEpoch(),
+ event.memberId(),
+ currentGroupMetadata.groupInstanceId()
+ ));
+ }
+ }
+ }
+
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private Optional<ConsumerGroupMetadata> groupMetadata;
@@ -177,6 +250,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
+ this(config, keyDeserializer, valueDeserializer, new
LinkedBlockingQueue<>());
+ }
+
+ AsyncKafkaConsumer(final ConsumerConfig config,
+ final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final LinkedBlockingQueue<BackgroundEvent>
backgroundEventQueue) {
try {
GroupRebalanceConfig groupRebalanceConfig = new
GroupRebalanceConfig(
config,
@@ -212,7 +292,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
// This FetchBuffer is shared between the application and network
threads.
this.fetchBuffer = new FetchBuffer(logContext);
@@ -413,7 +492,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
final
Optional<String> groupInstanceId) {
if (groupId != null) {
if (groupId.isEmpty()) {
- throw new InvalidGroupIdException("The configured group.id
should not be an empty string or whitespace.");
+ throw new InvalidGroupIdException("The configured " +
ConsumerConfig.GROUP_ID_CONFIG
+ + " should not be an empty string or whitespace.");
} else {
return Optional.of(new ConsumerGroupMetadata(
groupId,
@@ -889,7 +969,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public ConsumerGroupMetadata groupMetadata() {
- throw new KafkaException("method not implemented");
+ acquireAndEnsureOpen();
+ try {
+ maybeThrowInvalidGroupIdException();
+ return groupMetadata.get();
+ } finally {
+ release();
+ }
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index e724735f535..3632098a09b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -103,6 +104,8 @@ public class HeartbeatRequestManager implements
RequestManager {
*/
private final BackgroundEventHandler backgroundEventHandler;
+ private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
+
public HeartbeatRequestManager(
final LogContext logContext,
final Time time,
@@ -232,11 +235,27 @@ public class HeartbeatRequestManager implements
RequestManager {
this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
this.heartbeatRequestState.resetTimer();
this.membershipManager.onHeartbeatResponseReceived(response.data());
+ maybeSendGroupMetadataUpdateEvent();
return;
}
onErrorResponse(response, currentTimeMs);
}
+ private void maybeSendGroupMetadataUpdateEvent() {
+ if (previousGroupMetadataUpdateEvent == null ||
+
!previousGroupMetadataUpdateEvent.memberId().equals(membershipManager.memberId())
||
+ previousGroupMetadataUpdateEvent.memberEpoch() !=
membershipManager.memberEpoch()) {
+
+ final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent =
new GroupMetadataUpdateEvent(
+ membershipManager.memberEpoch(),
+ previousGroupMetadataUpdateEvent != null &&
membershipManager.memberId() == null ?
+ previousGroupMetadataUpdateEvent.memberId() :
membershipManager.memberId()
+ );
+ this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
+ previousGroupMetadataUpdateEvent = currentGroupMetadataUpdateEvent;
+ }
+ }
+
private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
final long currentTimeMs) {
Errors error = Errors.forCode(response.data().errorCode());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9ce43be7802..0e44fe032fe 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -26,7 +26,8 @@ import java.util.Objects;
public abstract class BackgroundEvent {
public enum Type {
- ERROR
+ ERROR,
+ GROUP_METADATA_UPDATE
}
protected final Type type;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
deleted file mode 100644
index fbc28d51bfd..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.LogContext;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * An {@link EventProcessor} that is created and executes in the application
thread for the purpose of processing
- * {@link BackgroundEvent background events} generated by the {@link
ConsumerNetworkThread network thread}.
- * Those events are generally of two types:
- *
- * <ul>
- * <li>Errors that occur in the network thread that need to be propagated
to the application thread</li>
- * <li>{@link ConsumerRebalanceListener} callbacks that are to be executed
on the application thread</li>
- * </ul>
- */
-public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
-
- public BackgroundEventProcessor(final LogContext logContext,
- final BlockingQueue<BackgroundEvent>
backgroundEventQueue) {
- super(logContext, backgroundEventQueue);
- }
-
- /**
- * Process the events—if any—that were produced by the {@link
ConsumerNetworkThread network thread}.
- * It is possible that {@link ErrorBackgroundEvent an error} could occur
when processing the events.
- * In such cases, the processor will take a reference to the first error,
continue to process the
- * remaining events, and then throw the first error that occurred.
- */
- @Override
- public void process() {
- AtomicReference<KafkaException> firstError = new AtomicReference<>();
- process((event, error) -> firstError.compareAndSet(null, error));
-
- if (firstError.get() != null)
- throw firstError.get();
- }
-
- @Override
- public void process(final BackgroundEvent event) {
- switch (event.type()) {
- case ERROR:
- process((ErrorBackgroundEvent) event);
- break;
- default:
- throw new IllegalArgumentException("Background event type " +
event.type() + " was not expected");
-
- }
- }
-
- @Override
- protected Class<BackgroundEvent> getEventClass() {
- return BackgroundEvent.class;
- }
-
- private void process(final ErrorBackgroundEvent event) {
- throw event.error();
- }
-
-}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
new file mode 100644
index 00000000000..120e6717242
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the
information is up-to-date. The
+ * information for the current state of the group member is managed on the
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdateEvent extends BackgroundEvent {
+
+ final private int memberEpoch;
+ final private String memberId;
+
+ public GroupMetadataUpdateEvent(final int memberEpoch,
+ final String memberId) {
+ super(Type.GROUP_METADATA_UPDATE);
+ this.memberEpoch = memberEpoch;
+ this.memberId = memberId;
+ }
+
+ public int memberEpoch() {
+ return memberEpoch;
+ }
+
+ public String memberId() {
+ return memberId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ GroupMetadataUpdateEvent that = (GroupMetadataUpdateEvent) o;
+ return memberEpoch == that.memberEpoch &&
+ Objects.equals(memberId, that.memberId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), memberEpoch, memberId);
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() +
+ ", memberEpoch=" + memberEpoch +
+ ", memberId='" + memberId + '\'';
+ }
+
+ @Override
+ public String toString() {
+ return "GroupMetadataUpdateEvent{" +
+ toStringBase() +
+ '}';
+ }
+
+}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 6015994bfc8..12b833109ec 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -27,7 +28,10 @@ import
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
@@ -48,6 +52,7 @@ import
org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestHeader;
@@ -83,6 +88,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -760,6 +766,132 @@ public class AsyncKafkaConsumerTest {
assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(singletonList(emptyTopic)));
}
+ @Test
+ public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
+ final Properties props = requiredConsumerProperties();
+ final ConsumerConfig config = new ConsumerConfig(props);
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer())) {
+
+
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+ final Throwable exception =
assertThrows(InvalidGroupIdException.class, consumer::groupMetadata);
+ assertEquals(
+ "To use the group management or offset commit APIs, you must "
+
+ "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in
the consumer configuration.",
+ exception.getMessage()
+ );
+ }
+ }
+
+ @Test
+ public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
+ final String groupId = "consumerGroupA";
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer())) {
+
+ final ConsumerGroupMetadata groupMetadata =
consumer.groupMetadata();
+
+ assertEquals(groupId, groupMetadata.groupId());
+ assertEquals(Optional.empty(), groupMetadata.groupInstanceId());
+ assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID,
groupMetadata.generationId());
+ assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupMetadata.memberId());
+ }
+ }
+
+ @Test
+ public void
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
+ final String groupId = "consumerGroupA";
+ final String groupInstanceId = "groupInstanceId1";
+ final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+ final ConsumerConfig config = new ConsumerConfig(props);
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer())) {
+
+ final ConsumerGroupMetadata groupMetadata =
consumer.groupMetadata();
+
+ assertEquals(groupId, groupMetadata.groupId());
+ assertEquals(Optional.of(groupInstanceId),
groupMetadata.groupInstanceId());
+ assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID,
groupMetadata.generationId());
+ assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupMetadata.memberId());
+ }
+ }
+
+ @Test
+ public void testGroupMetadataUpdateSingleCall() {
+ final String groupId = "consumerGroupA";
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer(), backgroundEventQueue)) {
+ final int generation = 1;
+ final String memberId = "newMemberId";
+ final ConsumerGroupMetadata expectedGroupMetadata = new
ConsumerGroupMetadata(
+ groupId,
+ generation,
+ memberId,
+ Optional.empty()
+ );
+ final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
+ generation,
+ memberId
+ );
+ backgroundEventQueue.add(groupMetadataUpdateEvent);
+ consumer.assign(singletonList(new TopicPartition("topic", 0)));
+ consumer.poll(Duration.ZERO);
+
+ final ConsumerGroupMetadata actualGroupMetadata =
consumer.groupMetadata();
+
+ assertEquals(expectedGroupMetadata, actualGroupMetadata);
+
+ final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate
= consumer.groupMetadata();
+
+ assertEquals(expectedGroupMetadata,
secondActualGroupMetadataWithoutUpdate);
+ }
+ }
+
+ @Test
+ public void testBackgroundError() {
+ final String groupId = "consumerGroupA";
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer(), backgroundEventQueue)) {
+ final KafkaException expectedException = new
KafkaException("Nobody expects the Spanish Inquisition");
+ final ErrorBackgroundEvent errorBackgroundEvent = new
ErrorBackgroundEvent(expectedException);
+ backgroundEventQueue.add(errorBackgroundEvent);
+ consumer.assign(singletonList(new TopicPartition("topic", 0)));
+
+ final KafkaException exception =
assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
+
+ assertEquals(expectedException.getMessage(),
exception.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultipleBackgroundErrors() {
+ final String groupId = "consumerGroupA";
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
+ try (final AsyncKafkaConsumer<String, String> consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), new
StringDeserializer(), backgroundEventQueue)) {
+ final KafkaException expectedException1 = new
KafkaException("Nobody expects the Spanish Inquisition");
+ final ErrorBackgroundEvent errorBackgroundEvent1 = new
ErrorBackgroundEvent(expectedException1);
+ backgroundEventQueue.add(errorBackgroundEvent1);
+ final KafkaException expectedException2 = new
KafkaException("Spam, Spam, Spam");
+ final ErrorBackgroundEvent errorBackgroundEvent2 = new
ErrorBackgroundEvent(expectedException2);
+ backgroundEventQueue.add(errorBackgroundEvent2);
+ consumer.assign(singletonList(new TopicPartition("topic", 0)));
+
+ final KafkaException exception =
assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
+
+ assertEquals(expectedException1.getMessage(),
exception.getMessage());
+ assertTrue(backgroundEventQueue.isEmpty());
+ }
+ }
+
@Test
public void testGroupIdNull() {
final Properties props = requiredConsumerProperties();
@@ -767,7 +899,7 @@ public class AsyncKafkaConsumerTest {
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);
- try (AsyncKafkaConsumer<String, String> consumer =
+ try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(),
new StringDeserializer())) {
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
@@ -778,13 +910,12 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupIdNotNullAndValid() {
- final Properties props = requiredConsumerProperties();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
+ final Properties props =
requiredConsumerPropertiesAndGroupId("consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);
- try (AsyncKafkaConsumer<String, String> consumer =
+ try (final AsyncKafkaConsumer<String, String> consumer =
new AsyncKafkaConsumer<>(config, new StringDeserializer(),
new StringDeserializer())) {
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
@@ -804,8 +935,7 @@ public class AsyncKafkaConsumerTest {
}
private void testInvalidGroupId(final String groupId) {
- final Properties props = requiredConsumerProperties();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
final ConsumerConfig config = new ConsumerConfig(props);
final Exception exception = assertThrows(
@@ -816,6 +946,12 @@ public class AsyncKafkaConsumerTest {
assertEquals("Failed to construct kafka consumer",
exception.getMessage());
}
+ private Properties requiredConsumerPropertiesAndGroupId(final String
groupId) {
+ final Properties props = requiredConsumerProperties();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ return props;
+ }
+
private Properties requiredConsumerProperties() {
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 254a39f3944..53917f6ff17 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -27,7 +27,6 @@ import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -93,7 +92,6 @@ public class ConsumerTestBuilder implements Closeable {
final FetchRequestManager fetchRequestManager;
final RequestManagers requestManagers;
public final ApplicationEventProcessor applicationEventProcessor;
- public final BackgroundEventProcessor backgroundEventProcessor;
public final BackgroundEventHandler backgroundEventHandler;
final MockClient client;
final Optional<GroupInformation> groupInfo;
@@ -265,14 +263,12 @@ public class ConsumerTestBuilder implements Closeable {
requestManagers,
metadata)
);
- this.backgroundEventProcessor = spy(new
BackgroundEventProcessor(logContext, backgroundEventQueue));
}
@Override
public void close() {
closeQuietly(requestManagers, RequestManagers.class.getSimpleName());
closeQuietly(applicationEventProcessor,
ApplicationEventProcessor.class.getSimpleName());
- closeQuietly(backgroundEventProcessor,
BackgroundEventProcessor.class.getSimpleName());
}
public static class ConsumerNetworkThreadTestBuilder extends
ConsumerTestBuilder {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 7bdee9650a8..b584a5fde5c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -17,7 +17,9 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
@@ -45,6 +47,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
@@ -74,6 +77,7 @@ public class HeartbeatRequestManagerTest {
private final String memberId = "member-id";
private final int memberEpoch = 1;
private BackgroundEventHandler backgroundEventHandler;
+ private BlockingQueue<BackgroundEvent> backgroundEventQueue;
@BeforeEach
public void setUp() {
@@ -88,6 +92,7 @@ public class HeartbeatRequestManagerTest {
heartbeatRequestState =
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
heartbeatState =
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
backgroundEventHandler = testBuilder.backgroundEventHandler;
+ backgroundEventQueue = testBuilder.backgroundEventQueue;
subscriptions = testBuilder.subscriptions;
membershipManager =
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
@@ -292,6 +297,101 @@ public class HeartbeatRequestManagerTest {
assertNull(heartbeatRequest.data().subscribedTopicRegex());
}
+ @Test
+ public void testConsumerGroupMetadataFirstUpdate() {
+ final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+ final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
+ memberEpoch,
+ memberId
+ );
+ assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ }
+
+ @Test
+ public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+ makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+ time.sleep(2000);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ ClientResponse responseWithSameUpdate =
createHeartbeatResponse(request, Errors.NONE);
+ request.handler().onComplete(responseWithSameUpdate);
+ assertEquals(0, backgroundEventQueue.size());
+ }
+
+ @Test
+ public void
testConsumerGroupMetadataUpdateWithMemberIdNullButMemberEpochUpdated() {
+ makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+ time.sleep(2000);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ final int updatedMemberEpoch = 2;
+ ClientResponse responseWithMemberEpochUpdate =
createHeartbeatResponseWithMemberIdNull(
+ request,
+ Errors.NONE,
+ updatedMemberEpoch
+ );
+ request.handler().onComplete(responseWithMemberEpochUpdate);
+ assertEquals(1, backgroundEventQueue.size());
+ final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
+ assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
+ final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
+ final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
+ updatedMemberEpoch,
+ memberId
+ );
+ assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ }
+
+ @Test
+ public void
testConsumerGroupMetadataUpdateWithMemberIdUpdatedAndMemberEpochSame() {
+ makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+ time.sleep(2000);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ final String updatedMemberId = "updatedMemberId";
+ ClientResponse responseWithMemberIdUpdate = createHeartbeatResponse(
+ request,
+ Errors.NONE,
+ updatedMemberId,
+ memberEpoch
+ );
+ request.handler().onComplete(responseWithMemberIdUpdate);
+ assertEquals(1, backgroundEventQueue.size());
+ final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
+ assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
+ final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
+ final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
+ memberEpoch,
+ updatedMemberId
+ );
+ assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ }
+
+ private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String
memberId, final int memberEpoch) {
+ resetWithZeroHeartbeatInterval(Optional.empty());
+ mockStableMember();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ ClientResponse firstResponse = createHeartbeatResponse(request,
Errors.NONE, memberId, memberEpoch);
+ request.handler().onComplete(firstResponse);
+ assertEquals(1, backgroundEventQueue.size());
+ final BackgroundEvent event = backgroundEventQueue.poll();
+ assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+ return (GroupMetadataUpdateEvent) event;
+ }
+
@ParameterizedTest
@MethodSource("errorProvider")
public void testHeartbeatResponseOnErrorHandling(final Errors error, final
boolean isFatal) {
@@ -312,7 +412,7 @@ public class HeartbeatRequestManagerTest {
switch (error) {
case NONE:
- verify(backgroundEventHandler, never()).add(any());
+
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
verify(membershipManager,
times(2)).onHeartbeatResponseReceived(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
break;
@@ -450,7 +550,25 @@ public class HeartbeatRequestManagerTest {
private ClientResponse createHeartbeatResponse(
final NetworkClientDelegate.UnsentRequest request,
- final Errors error) {
+ final Errors error
+ ) {
+ return createHeartbeatResponse(request, error, memberId, memberEpoch);
+ }
+
+ private ClientResponse createHeartbeatResponseWithMemberIdNull(
+ final NetworkClientDelegate.UnsentRequest request,
+ final Errors error,
+ final int memberEpoch
+ ) {
+ return createHeartbeatResponse(request, error, null, memberEpoch);
+ }
+
+ private ClientResponse createHeartbeatResponse(
+ final NetworkClientDelegate.UnsentRequest request,
+ final Errors error,
+ final String memberId,
+ final int memberEpoch
+ ) {
ConsumerGroupHeartbeatResponseData data = new
ConsumerGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
deleted file mode 100644
index 0670b8bdb7c..00000000000
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder;
-import org.apache.kafka.common.KafkaException;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-public class BackgroundEventHandlerTest {
-
- private ConsumerTestBuilder testBuilder;
- private BlockingQueue<BackgroundEvent> backgroundEventQueue;
- private BackgroundEventHandler backgroundEventHandler;
- private BackgroundEventProcessor backgroundEventProcessor;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder();
- backgroundEventQueue = testBuilder.backgroundEventQueue;
- backgroundEventHandler = testBuilder.backgroundEventHandler;
- backgroundEventProcessor = testBuilder.backgroundEventProcessor;
- }
-
- @AfterEach
- public void tearDown() {
- if (testBuilder != null)
- testBuilder.close();
- }
-
- @Test
- public void testNoEvents() {
- assertTrue(backgroundEventQueue.isEmpty());
- backgroundEventProcessor.process((event, error) -> { });
- assertTrue(backgroundEventQueue.isEmpty());
- }
-
- @Test
- public void testSingleEvent() {
- BackgroundEvent event = new ErrorBackgroundEvent(new
RuntimeException("A"));
- backgroundEventQueue.add(event);
- assertPeeked(event);
- backgroundEventProcessor.process((e, error) -> { });
- assertTrue(backgroundEventQueue.isEmpty());
- }
-
- @Test
- public void testSingleErrorEvent() {
- KafkaException error = new KafkaException("error");
- BackgroundEvent event = new ErrorBackgroundEvent(error);
- backgroundEventHandler.add(new ErrorBackgroundEvent(error));
- assertPeeked(event);
- assertProcessThrows(error);
- }
-
- @Test
- public void testMultipleEvents() {
- BackgroundEvent event1 = new ErrorBackgroundEvent(new
RuntimeException("A"));
- backgroundEventQueue.add(event1);
- backgroundEventQueue.add(new ErrorBackgroundEvent(new
RuntimeException("B")));
- backgroundEventQueue.add(new ErrorBackgroundEvent(new
RuntimeException("C")));
-
- assertPeeked(event1);
- backgroundEventProcessor.process((event, error) -> { });
- assertTrue(backgroundEventQueue.isEmpty());
- }
-
- @Test
- public void testMultipleErrorEvents() {
- Throwable error1 = new Throwable("error1");
- KafkaException error2 = new KafkaException("error2");
- KafkaException error3 = new KafkaException("error3");
-
- backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
- backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
- backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
- assertProcessThrows(new KafkaException(error1));
- }
-
- @Test
- public void testMixedEventsWithErrorEvents() {
- Throwable error1 = new Throwable("error1");
- KafkaException error2 = new KafkaException("error2");
- KafkaException error3 = new KafkaException("error3");
-
- RuntimeException errorToCheck = new RuntimeException("A");
- backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
- backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
- backgroundEventQueue.add(new ErrorBackgroundEvent(new
RuntimeException("B")));
- backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
- backgroundEventQueue.add(new ErrorBackgroundEvent(new
RuntimeException("C")));
- backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
- backgroundEventQueue.add(new ErrorBackgroundEvent(new
RuntimeException("D")));
-
- assertProcessThrows(new KafkaException(errorToCheck));
- }
-
- private void assertPeeked(BackgroundEvent event) {
- BackgroundEvent peekEvent = backgroundEventQueue.peek();
- assertNotNull(peekEvent);
- assertEquals(event, peekEvent);
- }
-
- private void assertProcessThrows(Throwable error) {
- assertFalse(backgroundEventQueue.isEmpty());
-
- try {
- backgroundEventProcessor.process();
- fail("Should have thrown error: " + error);
- } catch (Throwable t) {
- assertEquals(error.getClass(), t.getClass());
- assertEquals(error.getMessage(), t.getMessage());
- }
-
- assertTrue(backgroundEventQueue.isEmpty());
- }
-}