This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aa0443eb607 KAFKA-16285: Make group metadata available when a new
assignment is set (#15426)
aa0443eb607 is described below
commit aa0443eb607c2d1d3004312f55f7583102127cb8
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 4 12:42:24 2024 +0100
KAFKA-16285: Make group metadata available when a new assignment is set
(#15426)
Currently, in the async Kafka consumer updates to the group metadata
that are received by the heartbeat are propagated to the application thread
in form of an event. Group metadata is updated when a new assignment is
received. The new assignment is directly set in the subscription without
sending an update event from the background thread to the application
thread.
That means that there might be a delay between the application thread being
aware of the update to the assignment and the application thread being
aware of the update to the group metadata. This delay can cause stale
group metadata returned by the application thread that then causes
issues when data of the new assignment is committed. A concrete
example is
producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
The offsets to commit might already stem from the new assignment
but the group metadata might relate to the previous assignment.
Reviewers: Kirk True <[email protected]>, Andrew Schofield
<[email protected]>, Lucas Brutschy <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 118 ++++-----
.../internals/HeartbeatRequestManager.java | 18 --
.../consumer/internals/MembershipManagerImpl.java | 6 +
.../consumer/internals/RequestManagers.java | 4 +-
.../consumer/internals/events/BackgroundEvent.java | 2 +-
.../internals/events/GroupMetadataUpdateEvent.java | 53 ----
.../consumer/internals/AsyncKafkaConsumerTest.java | 271 ++++++---------------
.../internals/HeartbeatRequestManagerTest.java | 87 -------
.../consumer/internals/RequestManagersTest.java | 70 ++++++
.../test/java/org/apache/kafka/test/TestUtils.java | 9 +
10 files changed, 227 insertions(+), 411 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e706898b70a..dafd3293c74 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -52,7 +52,6 @@ import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
-import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -213,10 +212,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
process((ErrorEvent) event);
break;
- case GROUP_METADATA_UPDATE:
- process((GroupMetadataUpdateEvent) event);
- break;
-
case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
process((ConsumerRebalanceListenerCallbackNeededEvent)
event);
break;
@@ -231,18 +226,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw event.error();
}
- private void process(final GroupMetadataUpdateEvent event) {
- if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
- final ConsumerGroupMetadata currentGroupMetadata =
AsyncKafkaConsumer.this.groupMetadata.get();
- AsyncKafkaConsumer.this.groupMetadata = Optional.of(new
ConsumerGroupMetadata(
- currentGroupMetadata.groupId(),
- event.memberEpoch(),
- event.memberId(),
- currentGroupMetadata.groupInstanceId()
- ));
- }
- }
-
private void process(final
ConsumerRebalanceListenerCallbackNeededEvent event) {
ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
rebalanceListenerInvoker,
@@ -256,7 +239,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
- private Optional<ConsumerGroupMetadata> groupMetadata = Optional.empty();
+ private final AtomicReference<Optional<ConsumerGroupMetadata>>
groupMetadata = new AtomicReference<>(Optional.empty());
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
@@ -370,6 +353,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetchMetricsManager,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
+ this.groupMetadata.set(initializeGroupMetadata(config,
groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(time,
logContext,
backgroundEventHandler,
@@ -383,7 +367,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
networkClientDelegateSupplier,
clientTelemetryReporter,
metrics,
- offsetCommitCallbackInvoker);
+ offsetCommitCallbackInvoker,
+ this::updateGroupMetadata
+ );
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier =
ApplicationEventProcessor.supplier(logContext,
metadata,
applicationEventQueue,
@@ -413,8 +399,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId))
);
- this.groupMetadata = initializeGroupMetadata(config,
groupRebalanceConfig);
-
// The FetchCollector is only used on the application thread.
this.fetchCollector = fetchCollectorFactory.build(logContext,
metadata,
@@ -426,7 +410,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP_PREFIX);
- if (groupMetadata.isPresent() &&
+ if (groupMetadata.get().isPresent() &&
GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) ==
GroupProtocol.CONSUMER) {
config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); //
Used by background thread
}
@@ -478,7 +462,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
rebalanceListenerInvoker
);
this.metrics = metrics;
- this.groupMetadata = initializeGroupMetadata(groupId,
Optional.empty());
+ this.groupMetadata.set(initializeGroupMetadata(groupId,
Optional.empty()));
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
@@ -532,7 +516,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
GroupRebalanceConfig.ProtocolType.CONSUMER
);
- this.groupMetadata = initializeGroupMetadata(config,
groupRebalanceConfig);
+ this.groupMetadata.set(initializeGroupMetadata(config,
groupRebalanceConfig));
BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
@@ -568,7 +552,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
networkClientDelegateSupplier,
clientTelemetryReporter,
metrics,
- offsetCommitCallbackInvoker
+ offsetCommitCallbackInvoker,
+ this::updateGroupMetadata
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier
= ApplicationEventProcessor.supplier(
logContext,
@@ -651,17 +636,35 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw new InvalidGroupIdException("The configured " +
ConsumerConfig.GROUP_ID_CONFIG
+ " should not be an empty string or whitespace.");
} else {
- return Optional.of(new ConsumerGroupMetadata(
- groupId,
- JoinGroupRequest.UNKNOWN_GENERATION_ID,
- JoinGroupRequest.UNKNOWN_MEMBER_ID,
- groupInstanceId
- ));
+ return Optional.of(initializeConsumerGroupMetadata(groupId,
groupInstanceId));
}
}
return Optional.empty();
}
+ private ConsumerGroupMetadata initializeConsumerGroupMetadata(final String
groupId,
+ final
Optional<String> groupInstanceId) {
+ return new ConsumerGroupMetadata(
+ groupId,
+ JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ JoinGroupRequest.UNKNOWN_MEMBER_ID,
+ groupInstanceId
+ );
+ }
+
+ private void updateGroupMetadata(final Optional<Integer> memberEpoch,
final Optional<String> memberId) {
+ groupMetadata.updateAndGet(
+ oldGroupMetadataOptional -> oldGroupMetadataOptional.map(
+ oldGroupMetadata -> new ConsumerGroupMetadata(
+ oldGroupMetadata.groupId(),
+ memberEpoch.orElse(oldGroupMetadata.generationId()),
+ memberId.orElse(oldGroupMetadata.memberId()),
+ oldGroupMetadata.groupInstanceId()
+ )
+ )
+ );
+ }
+
/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event,
process the record and return it. If it is
@@ -713,18 +716,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
wakeupTrigger.maybeTriggerWakeup();
updateAssignmentMetadataIfNeeded(timer);
- if (isGenerationKnownOrPartitionsUserAssigned()) {
- final Fetch<K, V> fetch = pollForFetches(timer);
- if (!fetch.isEmpty()) {
- if (fetch.records().isEmpty()) {
- log.trace("Returning empty records from `poll()` "
- + "since the consumer's position has advanced
for at least one topic partition");
- }
-
- return interceptors.onConsume(new
ConsumerRecords<>(fetch.records()));
+ final Fetch<K, V> fetch = pollForFetches(timer);
+ if (!fetch.isEmpty()) {
+ if (fetch.records().isEmpty()) {
+ log.trace("Returning empty records from `poll()` "
+ + "since the consumer's position has advanced for
at least one topic partition");
}
- } else {
- timer.update();
+
+ return interceptors.onConsume(new
ConsumerRecords<>(fetch.records()));
}
// We will wait for retryBackoffMs
} while (timer.notExpired());
@@ -736,13 +735,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- private boolean isGenerationKnownOrPartitionsUserAssigned() {
- if (subscriptions.hasAutoAssignedPartitions()) {
- return groupMetadata.filter(g -> g.generationId() !=
JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent();
- }
- return true;
- }
-
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for
all the subscribed list of topics and
* partitions.
@@ -960,7 +952,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private void maybeThrowInvalidGroupIdException() {
- if (!groupMetadata.isPresent()) {
+ if (!groupMetadata.get().isPresent()) {
throw new InvalidGroupIdException("To use the group management or
offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the
consumer configuration.");
}
@@ -1186,7 +1178,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
- return groupMetadata.get();
+ return groupMetadata.get().get();
} finally {
release();
}
@@ -1266,7 +1258,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* 3. if partition revocation completes successfully, send leave group
*/
void prepareShutdown(final Timer timer, final AtomicReference<Throwable>
firstException) {
- if (!groupMetadata.isPresent())
+ if (!groupMetadata.get().isPresent())
return;
maybeAutoCommitSync(autoCommitEnabled, timer);
applicationEventHandler.add(new CommitOnCloseEvent());
@@ -1463,7 +1455,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
fetchBuffer.retainAll(Collections.emptySet());
- if (groupMetadata.isPresent()) {
+ if (groupMetadata.get().isPresent()) {
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent();
applicationEventHandler.add(unsubscribeEvent);
log.info("Unsubscribing all topics or patterns and assigned
partitions");
@@ -1475,7 +1467,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event
to complete");
}
- groupMetadata =
initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());
+ resetGroupMetadata();
}
subscriptions.unsubscribe();
} finally {
@@ -1483,6 +1475,16 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
+ private void resetGroupMetadata() {
+ groupMetadata.updateAndGet(
+ oldGroupMetadataOptional -> oldGroupMetadataOptional
+ .map(oldGroupMetadata -> initializeConsumerGroupMetadata(
+ oldGroupMetadata.groupId(),
+ oldGroupMetadata.groupInstanceId()
+ ))
+ );
+ }
+
@Override
@Deprecated
public ConsumerRecords<K, V> poll(final long timeoutMs) {
@@ -1604,7 +1606,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* according to config {@link CommonClientConfigs#GROUP_ID_CONFIG}
*/
private boolean isCommittedOffsetsManagementEnabled() {
- return groupMetadata.isPresent();
+ return groupMetadata.get().isPresent();
}
/**
@@ -1905,12 +1907,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private void maybeThrowFencedInstanceException() {
if (offsetCommitCallbackInvoker.hasFencedException()) {
String groupInstanceId = "unknown";
- if (!groupMetadata.isPresent()) {
+ if (!groupMetadata.get().isPresent()) {
log.error("No group metadata found although a group ID was
provided. This is a bug!");
- } else if (!groupMetadata.get().groupInstanceId().isPresent()) {
+ } else if
(!groupMetadata.get().get().groupInstanceId().isPresent()) {
log.error("No group instance ID found although the consumer is
fenced. This is a bug!");
} else {
- groupInstanceId = groupMetadata.get().groupInstanceId().get();
+ groupInstanceId =
groupMetadata.get().get().groupInstanceId().get();
}
throw new FencedInstanceIdException("Get fenced exception for
group.instance.id " + groupInstanceId);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 826774a6a64..d2a7205d5f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
-import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -110,7 +109,6 @@ public class HeartbeatRequestManager implements
RequestManager {
* sending heartbeat until the next poll.
*/
private final Timer pollTimer;
- private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
/**
* Holding the heartbeat sensor to measure heartbeat timing and response
latency
@@ -328,27 +326,11 @@ public class HeartbeatRequestManager implements
RequestManager {
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatRequestState.resetTimer();
membershipManager.onHeartbeatSuccess(response.data());
- maybeSendGroupMetadataUpdateEvent();
return;
}
onErrorResponse(response, currentTimeMs);
}
- private void maybeSendGroupMetadataUpdateEvent() {
- if (previousGroupMetadataUpdateEvent == null ||
-
!previousGroupMetadataUpdateEvent.memberId().equals(membershipManager.memberId())
||
- previousGroupMetadataUpdateEvent.memberEpoch() !=
membershipManager.memberEpoch()) {
-
- final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent =
new GroupMetadataUpdateEvent(
- membershipManager.memberEpoch(),
- previousGroupMetadataUpdateEvent != null &&
membershipManager.memberId() == null ?
- previousGroupMetadataUpdateEvent.memberId() :
membershipManager.memberId()
- );
- this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
- previousGroupMetadataUpdateEvent = currentGroupMetadataUpdateEvent;
- }
- }
-
private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
final long currentTimeMs) {
Errors error = Errors.forCode(response.data().errorCode());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 35322fb51be..c2bdd3f8609 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -57,6 +57,7 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static java.util.Collections.unmodifiableList;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
@@ -1491,4 +1492,9 @@ public class MembershipManagerImpl implements
MembershipManager {
}
return PollResult.EMPTY;
}
+
+ // visible for testing
+ List<MemberStateListener> stateListeners() {
+ return unmodifiableList(stateUpdatesListeners);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 0b4c043d4a4..75d87432db6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -123,7 +123,8 @@ public class RequestManagers implements Closeable {
final
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final
Optional<ClientTelemetryReporter> clientTelemetryReporter,
final Metrics metrics,
- final
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker
+ final
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
+ final MemberStateListener
applicationThreadMemberStateListener
) {
return new CachedSupplier<RequestManagers>() {
@Override
@@ -192,6 +193,7 @@ public class RequestManagers implements Closeable {
time,
metrics);
membershipManager.registerStateListener(commit);
+
membershipManager.registerStateListener(applicationThreadMemberStateListener);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
time,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9bc3fbebc30..4241482bcaa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -27,7 +27,7 @@ import java.util.Objects;
public abstract class BackgroundEvent {
public enum Type {
- ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED,
GROUP_METADATA_UPDATE
+ ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED
}
private final Type type;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
deleted file mode 100644
index 001f5498183..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
-
-/**
- * This event is sent by the {@link ConsumerNetworkThread consumer's network
thread} to the application thread
- * so that when the user calls the {@link Consumer#groupMetadata()} API, the
information is up-to-date. The
- * information for the current state of the group member is managed on the
consumer network thread and thus
- * requires this interplay between threads.
- */
-public class GroupMetadataUpdateEvent extends BackgroundEvent {
-
- private final int memberEpoch;
- private final String memberId;
-
- public GroupMetadataUpdateEvent(final int memberEpoch, final String
memberId) {
- super(Type.GROUP_METADATA_UPDATE);
- this.memberEpoch = memberEpoch;
- this.memberId = memberId;
- }
-
- public int memberEpoch() {
- return memberEpoch;
- }
-
- public String memberId() {
- return memberId;
- }
-
- @Override
- public String toStringBase() {
- return super.toStringBase() +
- ", memberEpoch=" + memberEpoch +
- ", memberId='" + memberId + '\'';
- }
-}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 35e742fbd0a..fe83a417897 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -42,7 +42,6 @@ import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
-import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -52,7 +51,6 @@ import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEven
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
@@ -80,6 +78,7 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.time.Duration;
@@ -102,7 +101,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -117,10 +115,12 @@ import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON
import static
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -133,6 +133,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -164,19 +165,19 @@ public class AsyncKafkaConsumerTest {
}
private AsyncKafkaConsumer<String, String> newConsumer() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
return newConsumer(props);
}
private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
return newConsumer(props);
}
@SuppressWarnings("UnusedReturnValue")
private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
- final Properties props = requiredConsumerPropertiesAndGroupId("");
+ final Properties props = requiredConsumerConfigAndGroupId("");
return newConsumer(props);
}
@@ -921,7 +922,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testInterceptorAutoCommitOnClose() {
- Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+ Properties props = requiredConsumerConfigAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
@@ -937,7 +938,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testInterceptorCommitSync() {
- Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+ Properties props = requiredConsumerConfigAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -952,7 +953,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testNoInterceptorCommitSyncFailed() {
- Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+ Properties props = requiredConsumerConfigAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -968,7 +969,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testInterceptorCommitAsync() {
- Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+ Properties props = requiredConsumerConfigAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -985,7 +986,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testNoInterceptorCommitAsyncFailed() {
- Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+ Properties props = requiredConsumerConfigAndGroupId("test-id");
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -1085,7 +1086,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1102,7 +1103,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@@ -1117,7 +1118,7 @@ public class AsyncKafkaConsumerTest {
public void
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
final String groupId = "consumerGroupA";
final String groupInstanceId = "groupInstanceId1";
- final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+ final Properties props = requiredConsumerConfigAndGroupId(groupId);
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1130,164 +1131,65 @@ public class AsyncKafkaConsumerTest {
assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupMetadata.memberId());
}
- @Test
- public void testGroupMetadataUpdateSingleCall() {
+ private MemberStateListener captureGroupMetadataUpdateListener(final
MockedStatic<RequestManagers> requestManagers) {
+ ArgumentCaptor<MemberStateListener>
applicationThreadMemberStateListener =
ArgumentCaptor.forClass(MemberStateListener.class);
+ requestManagers.verify(() -> RequestManagers.supplier(
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ any(),
+ applicationThreadMemberStateListener.capture()
+ ));
+ return applicationThreadMemberStateListener.getValue();
+ }
+
+ @Test
+ public void testGroupMetadataUpdate() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
- consumer = newConsumer(config);
-
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
- completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
-
- final int generation = 1;
- final String memberId = "newMemberId";
- final ConsumerGroupMetadata expectedGroupMetadata = new
ConsumerGroupMetadata(
- groupId,
- generation,
- memberId,
- Optional.empty()
- );
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- generation,
- memberId
- );
- backgroundEventQueue.add(groupMetadataUpdateEvent);
- consumer.assign(singletonList(new TopicPartition("topic", 0)));
- consumer.poll(Duration.ZERO);
-
- final ConsumerGroupMetadata actualGroupMetadata =
consumer.groupMetadata();
-
- assertEquals(expectedGroupMetadata, actualGroupMetadata);
-
- final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate =
consumer.groupMetadata();
-
- assertEquals(expectedGroupMetadata,
secondActualGroupMetadataWithoutUpdate);
- }
-
- @Test
- public void
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithTopics()
{
-
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
- consumer.subscribe(singletonList("topic"));
- });
- }
-
- @Test
- public void
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithPattern()
{
-
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
- when(metadata.fetch()).thenReturn(Cluster.empty());
- consumer.subscribe(Pattern.compile("topic"));
- });
- }
-
- private void
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(final
Runnable subscription) {
- final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
- consumer = newConsumer(config);
- subscription.run();
-
- consumer.poll(Duration.ZERO);
-
- verify(fetchCollector, never()).collectFetch(any(FetchBuffer.class));
- }
-
- @Test
- public void
testPollReturningRecordsIfGroupIdSetAndGroupManagementIsNotUsed() {
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
-
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
- }
-
- @Test
- public void
testPollReturningRecordsIfGroupIdNotSetAndGroupManagementIsNotUsed() {
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerProperties());
-
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
- }
-
- private void
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(final
ConsumerConfig config) {
- final String topic = "topic";
- final TopicPartition topicPartition = new TopicPartition(topic, 0);
- consumer = newConsumer(config);
- consumer.assign(singletonList(topicPartition));
- final List<ConsumerRecord<String, String>> records = singletonList(
- new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
- );
- when(fetchCollector.collectFetch(any(FetchBuffer.class)))
- .thenReturn(Fetch.forPartition(topicPartition, records, true));
- completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
-
- consumer.poll(Duration.ZERO);
-
- verify(fetchCollector).collectFetch(any(FetchBuffer.class));
- }
-
- @Test
- public void
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithTopics() {
- final String topic = "topic";
- testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
- topic,
- () -> {
- consumer.subscribe(singletonList(topic));
- });
- }
-
- @Test
- public void
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithPattern() {
- final String topic = "topic";
- testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
- topic,
- () -> {
- when(metadata.fetch()).thenReturn(Cluster.empty());
- consumer.subscribe(Pattern.compile(topic));
- });
- }
-
- private void
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(final String
topic,
-
final Runnable subscription) {
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
- final int generation = 1;
- final String memberId = "newMemberId";
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- generation,
- memberId
- );
- backgroundEventQueue.add(groupMetadataUpdateEvent);
- final TopicPartition topicPartition = new TopicPartition(topic, 0);
- final List<ConsumerRecord<String, String>> records = singletonList(
- new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
- );
- when(fetchCollector.collectFetch(any(FetchBuffer.class)))
- .thenReturn(Fetch.forPartition(topicPartition, records, true));
- consumer = newConsumer(config);
- subscription.run();
-
- consumer.poll(Duration.ZERO);
-
- verify(fetchCollector).collectFetch(any(FetchBuffer.class));
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
+ try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
+ consumer = newConsumer(config);
+ final ConsumerGroupMetadata oldGroupMetadata =
consumer.groupMetadata();
+ final MemberStateListener groupMetadataUpdateListener =
captureGroupMetadataUpdateListener(requestManagers);
+ final int expectedMemberEpoch = 42;
+ final String expectedMemberId = "memberId";
+ groupMetadataUpdateListener.onMemberEpochUpdated(
+ Optional.of(expectedMemberEpoch),
+ Optional.of(expectedMemberId)
+ );
+ final ConsumerGroupMetadata newGroupMetadata =
consumer.groupMetadata();
+ assertEquals(oldGroupMetadata.groupId(),
newGroupMetadata.groupId());
+ assertEquals(expectedMemberId, newGroupMetadata.memberId());
+ assertEquals(expectedMemberEpoch, newGroupMetadata.generationId());
+ assertEquals(oldGroupMetadata.groupInstanceId(),
newGroupMetadata.groupInstanceId());
+ }
}
@Test
public void testGroupMetadataIsResetAfterUnsubscribe() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
- consumer = newConsumer(config);
- consumer.subscribe(singletonList("topic"));
- final int generation = 1;
- final String memberId = "newMemberId";
- final ConsumerGroupMetadata groupMetadataAfterSubscription = new
ConsumerGroupMetadata(
- groupId,
- generation,
- memberId,
- Optional.empty()
- );
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- generation,
- memberId
- );
- backgroundEventQueue.add(groupMetadataUpdateEvent);
-
when(fetchCollector.collectFetch(any(FetchBuffer.class))).thenReturn(Fetch.empty());
- consumer.poll(Duration.ZERO);
-
- assertEquals(groupMetadataAfterSubscription, consumer.groupMetadata());
-
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
+ try (final MockedStatic<RequestManagers> requestManagers =
mockStatic(RequestManagers.class)) {
+ consumer = newConsumer(config);
+ final MemberStateListener groupMetadataUpdateListener =
captureGroupMetadataUpdateListener(requestManagers);
+ consumer.subscribe(singletonList("topic"));
+ final int memberEpoch = 42;
+ final String memberId = "memberId";
+
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch),
Optional.of(memberId));
+ final ConsumerGroupMetadata groupMetadata =
consumer.groupMetadata();
+ assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID,
groupMetadata.generationId());
+ assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupMetadata.memberId());
+ }
completeUnsubscribeApplicationEventSuccessfully();
consumer.unsubscribe();
@@ -1298,7 +1200,6 @@ public class AsyncKafkaConsumerTest {
JoinGroupRequest.UNKNOWN_MEMBER_ID,
Optional.empty()
);
-
assertEquals(groupMetadataAfterUnsubscription,
consumer.groupMetadata());
}
@@ -1379,7 +1280,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testBackgroundError() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
final KafkaException expectedException = new KafkaException("Nobody
expects the Spanish Inquisition");
@@ -1394,7 +1295,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testMultipleBackgroundErrors() {
final String groupId = "consumerGroupA";
- final ConsumerConfig config = new
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+ final ConsumerConfig config = new
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
consumer = newConsumer(config);
final KafkaException expectedException1 = new KafkaException("Nobody
expects the Spanish Inquisition");
@@ -1412,7 +1313,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
final ConsumerConfig config = new ConsumerConfig(props);
consumer = newConsumer(config);
@@ -1422,7 +1323,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupRemoteAssignorUnusedInGenericProtocol() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
@@ -1434,7 +1335,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupRemoteAssignorUsedInConsumerProtocol() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
@@ -1446,7 +1347,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupIdNull() {
- final Properties props = requiredConsumerProperties();
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);
@@ -1458,7 +1359,7 @@ public class AsyncKafkaConsumerTest {
@Test
public void testGroupIdNotNullAndValid() {
- final Properties props =
requiredConsumerPropertiesAndGroupId("consumerGroupA");
+ final Properties props =
requiredConsumerConfigAndGroupId("consumerGroupA");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
final ConsumerConfig config = new ConsumerConfig(props);
@@ -1492,7 +1393,6 @@ public class AsyncKafkaConsumerTest {
final TopicPartition tp = new TopicPartition("topic", 0);
final List<ConsumerRecord<String, String>> records = singletonList(
new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
- backgroundEventQueue.add(new GroupMetadataUpdateEvent(1, "memberId"));
doAnswer(invocation -> Fetch.forPartition(tp, records, true))
.when(fetchCollector)
.collectFetch(Mockito.any(FetchBuffer.class));
@@ -1503,7 +1403,7 @@ public class AsyncKafkaConsumerTest {
}
private void testInvalidGroupId(final String groupId) {
- final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+ final Properties props = requiredConsumerConfigAndGroupId(groupId);
final ConsumerConfig config = new ConsumerConfig(props);
final Exception exception = assertThrows(
@@ -1514,20 +1414,12 @@ public class AsyncKafkaConsumerTest {
assertEquals("Failed to construct kafka consumer",
exception.getMessage());
}
- private Properties requiredConsumerPropertiesAndGroupId(final String
groupId) {
- final Properties props = requiredConsumerProperties();
+ private Properties requiredConsumerConfigAndGroupId(final String groupId) {
+ final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
- private Properties requiredConsumerProperties() {
- final Properties props = new Properties();
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- return props;
- }
-
private void
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean
committedOffsetsEnabled) {
completeFetchedCommittedOffsetApplicationEventExceptionally(new
TimeoutException());
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
@@ -1588,13 +1480,6 @@ public class AsyncKafkaConsumerTest {
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
);
- final int generation = 1;
- final String memberId = "newMemberId";
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- generation,
- memberId
- );
- backgroundEventQueue.add(groupMetadataUpdateEvent);
// On the first iteration, return no data; on the second, return two
records
doAnswer(invocation -> {
// Mock the subscription being assigned as the first fetch is
collected
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 90cfb90bb1b..8e05e505be4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
@@ -325,91 +324,6 @@ public class HeartbeatRequestManagerTest {
assertEquals(DEFAULT_REMOTE_ASSIGNOR,
heartbeatRequest.data().serverAssignor());
}
- @Test
- public void testConsumerGroupMetadataFirstUpdate() {
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
- assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
- assertEquals(memberId, groupMetadataUpdateEvent.memberId());
- }
-
- @Test
- public void testConsumerGroupMetadataUpdateWithSameUpdate() {
- makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
- time.sleep(2000);
- NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
-
- assertEquals(1, result.unsentRequests.size());
- NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
- ClientResponse responseWithSameUpdate =
createHeartbeatResponse(request, Errors.NONE);
- request.handler().onComplete(responseWithSameUpdate);
- assertEquals(0, backgroundEventQueue.size());
- }
-
- @Test
- public void
testConsumerGroupMetadataUpdateWithMemberIdNullButMemberEpochUpdated() {
- makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
- time.sleep(2000);
- NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
-
- assertEquals(1, result.unsentRequests.size());
- NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
- final int updatedMemberEpoch = 2;
- ClientResponse responseWithMemberEpochUpdate =
createHeartbeatResponseWithMemberIdNull(
- request,
- Errors.NONE,
- updatedMemberEpoch
- );
- request.handler().onComplete(responseWithMemberEpochUpdate);
- assertEquals(1, backgroundEventQueue.size());
- final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
- assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
- assertEquals(updatedMemberEpoch,
groupMetadataUpdateEvent.memberEpoch());
- assertEquals(memberId, groupMetadataUpdateEvent.memberId());
- }
-
- @Test
- public void
testConsumerGroupMetadataUpdateWithMemberIdUpdatedAndMemberEpochSame() {
- makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
- time.sleep(2000);
- NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
-
- assertEquals(1, result.unsentRequests.size());
- NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
- final String updatedMemberId = "updatedMemberId";
- ClientResponse responseWithMemberIdUpdate = createHeartbeatResponse(
- request,
- Errors.NONE,
- updatedMemberId,
- memberEpoch
- );
- request.handler().onComplete(responseWithMemberIdUpdate);
- assertEquals(1, backgroundEventQueue.size());
- final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
- assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
- final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
- assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
- assertEquals(updatedMemberId, groupMetadataUpdateEvent.memberId());
- }
-
- private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String
memberId, final int memberEpoch) {
- resetWithZeroHeartbeatInterval(Optional.empty());
- mockStableMember();
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
- NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
- assertEquals(1, result.unsentRequests.size());
- NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
- ClientResponse firstResponse = createHeartbeatResponse(request,
Errors.NONE, memberId, memberEpoch);
- request.handler().onComplete(firstResponse);
- assertEquals(1, backgroundEventQueue.size());
- final BackgroundEvent event = backgroundEventQueue.poll();
- assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
- return (GroupMetadataUpdateEvent) event;
- }
-
@ParameterizedTest
@MethodSource("errorProvider")
public void testHeartbeatResponseOnErrorHandling(final Errors error, final
boolean isFatal) {
@@ -430,7 +344,6 @@ public class HeartbeatRequestManagerTest {
switch (error) {
case NONE:
-
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
verify(membershipManager,
times(2)).onHeartbeatSuccess(mockResponse.data());
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
break;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
new file mode 100644
index 00000000000..640c7e98e42
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class RequestManagersTest {
+
+ @Test
+ public void testMemberStateListenerRegistered() {
+
+ final MemberStateListener listener = (memberEpoch, memberId) -> { };
+
+ final Properties properties = requiredConsumerConfig();
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"consumerGroup");
+ final ConsumerConfig config = new ConsumerConfig(properties);
+ final GroupRebalanceConfig groupRebalanceConfig = new
GroupRebalanceConfig(
+ config,
+ GroupRebalanceConfig.ProtocolType.CONSUMER
+ );
+ final RequestManagers requestManagers = RequestManagers.supplier(
+ new MockTime(),
+ new LogContext(),
+ mock(BackgroundEventHandler.class),
+ mock(ConsumerMetadata.class),
+ mock(SubscriptionState.class),
+ mock(FetchBuffer.class),
+ config,
+ groupRebalanceConfig,
+ mock(ApiVersions.class),
+ mock(FetchMetricsManager.class),
+ () -> mock(NetworkClientDelegate.class),
+ Optional.empty(),
+ new Metrics(),
+ mock(OffsetCommitCallbackInvoker.class),
+ listener
+ ).get();
+ requestManagers.membershipManager.ifPresent(
+ membershipManager -> assertTrue(((MembershipManagerImpl)
membershipManager).stateListeners().contains(listener))
+ );
+ }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index db86558f7d0..3a7dcfc9305 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.ByteBufferChannel;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -279,6 +280,14 @@ public class TestUtils {
return producerConfig(bootstrapServers, keySerializer,
valueSerializer, new Properties());
}
+ public static Properties requiredConsumerConfig() {
+ final Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9091");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ return consumerConfig;
+ }
+
public static Properties consumerConfig(final String bootstrapServers,
final String groupId,
final Class<?> keyDeserializer,