This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa0443eb607 KAFKA-16285: Make group metadata available when a new 
assignment is set (#15426)
aa0443eb607 is described below

commit aa0443eb607c2d1d3004312f55f7583102127cb8
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Mar 4 12:42:24 2024 +0100

    KAFKA-16285: Make group metadata available when a new assignment is set 
(#15426)
    
    Currently, in the async Kafka consumer updates to the group metadata
    that are received by the heartbeat are propagated to the application thread
    in form of an event. Group metadata is updated when a new assignment is
    received. The new assignment is directly set in the subscription without
    sending an update event from the background thread to the application 
thread.
    That means that there might be a delay between the application thread being
    aware of the update to the assignment and the application thread being
    aware of the update to the group metadata. This delay can cause stale
    group metadata returned by the application thread that then causes
    issues when data of the new assignment is committed. A concrete
    example is
    producer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata)
    The offsets to commit might already stem from the new assignment
    but the group metadata might relate to the previous assignment.
    
    Reviewers: Kirk True <[email protected]>, Andrew Schofield 
<[email protected]>, Lucas Brutschy <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 118 ++++-----
 .../internals/HeartbeatRequestManager.java         |  18 --
 .../consumer/internals/MembershipManagerImpl.java  |   6 +
 .../consumer/internals/RequestManagers.java        |   4 +-
 .../consumer/internals/events/BackgroundEvent.java |   2 +-
 .../internals/events/GroupMetadataUpdateEvent.java |  53 ----
 .../consumer/internals/AsyncKafkaConsumerTest.java | 271 ++++++---------------
 .../internals/HeartbeatRequestManagerTest.java     |  87 -------
 .../consumer/internals/RequestManagersTest.java    |  70 ++++++
 .../test/java/org/apache/kafka/test/TestUtils.java |   9 +
 10 files changed, 227 insertions(+), 411 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index e706898b70a..dafd3293c74 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -52,7 +52,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -213,10 +212,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     process((ErrorEvent) event);
                     break;
 
-                case GROUP_METADATA_UPDATE:
-                    process((GroupMetadataUpdateEvent) event);
-                    break;
-
                 case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
                     process((ConsumerRebalanceListenerCallbackNeededEvent) 
event);
                     break;
@@ -231,18 +226,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             throw event.error();
         }
 
-        private void process(final GroupMetadataUpdateEvent event) {
-            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
-                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
-                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
-                    currentGroupMetadata.groupId(),
-                    event.memberEpoch(),
-                    event.memberId(),
-                    currentGroupMetadata.groupInstanceId()
-                ));
-            }
-        }
-
         private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
             ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
                 rebalanceListenerInvoker,
@@ -256,7 +239,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
-    private Optional<ConsumerGroupMetadata> groupMetadata = Optional.empty();
+    private final AtomicReference<Optional<ConsumerGroupMetadata>> 
groupMetadata = new AtomicReference<>(Optional.empty());
     private final KafkaConsumerMetrics kafkaConsumerMetrics;
     private Logger log;
     private final String clientId;
@@ -370,6 +353,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     fetchMetricsManager,
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
             this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
+            this.groupMetadata.set(initializeGroupMetadata(config, 
groupRebalanceConfig));
             final Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(time,
                     logContext,
                     backgroundEventHandler,
@@ -383,7 +367,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     networkClientDelegateSupplier,
                     clientTelemetryReporter,
                     metrics,
-                    offsetCommitCallbackInvoker);
+                    offsetCommitCallbackInvoker,
+                    this::updateGroupMetadata
+            );
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier = 
ApplicationEventProcessor.supplier(logContext,
                     metadata,
                     applicationEventQueue,
@@ -413,8 +399,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
             );
 
-            this.groupMetadata = initializeGroupMetadata(config, 
groupRebalanceConfig);
-
             // The FetchCollector is only used on the application thread.
             this.fetchCollector = fetchCollectorFactory.build(logContext,
                     metadata,
@@ -426,7 +410,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
             this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
 
-            if (groupMetadata.isPresent() &&
+            if (groupMetadata.get().isPresent() &&
                 
GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) == 
GroupProtocol.CONSUMER) {
                 config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); // 
Used by background thread
             }
@@ -478,7 +462,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 rebalanceListenerInvoker
         );
         this.metrics = metrics;
-        this.groupMetadata = initializeGroupMetadata(groupId, 
Optional.empty());
+        this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
@@ -532,7 +516,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             GroupRebalanceConfig.ProtocolType.CONSUMER
         );
 
-        this.groupMetadata = initializeGroupMetadata(config, 
groupRebalanceConfig);
+        this.groupMetadata.set(initializeGroupMetadata(config, 
groupRebalanceConfig));
 
         BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
@@ -568,7 +552,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             networkClientDelegateSupplier,
             clientTelemetryReporter,
             metrics,
-            offsetCommitCallbackInvoker
+            offsetCommitCallbackInvoker,
+            this::updateGroupMetadata
         );
         Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier 
= ApplicationEventProcessor.supplier(
                 logContext,
@@ -651,17 +636,35 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new InvalidGroupIdException("The configured " + 
ConsumerConfig.GROUP_ID_CONFIG
                     + " should not be an empty string or whitespace.");
             } else {
-                return Optional.of(new ConsumerGroupMetadata(
-                    groupId,
-                    JoinGroupRequest.UNKNOWN_GENERATION_ID,
-                    JoinGroupRequest.UNKNOWN_MEMBER_ID,
-                    groupInstanceId
-                ));
+                return Optional.of(initializeConsumerGroupMetadata(groupId, 
groupInstanceId));
             }
         }
         return Optional.empty();
     }
 
+    private ConsumerGroupMetadata initializeConsumerGroupMetadata(final String 
groupId,
+                                                                  final 
Optional<String> groupInstanceId) {
+        return new ConsumerGroupMetadata(
+            groupId,
+            JoinGroupRequest.UNKNOWN_GENERATION_ID,
+            JoinGroupRequest.UNKNOWN_MEMBER_ID,
+            groupInstanceId
+        );
+    }
+
+    private void updateGroupMetadata(final Optional<Integer> memberEpoch, 
final Optional<String> memberId) {
+        groupMetadata.updateAndGet(
+            oldGroupMetadataOptional -> oldGroupMetadataOptional.map(
+                oldGroupMetadata -> new ConsumerGroupMetadata(
+                    oldGroupMetadata.groupId(),
+                    memberEpoch.orElse(oldGroupMetadata.generationId()),
+                    memberId.orElse(oldGroupMetadata.memberId()),
+                    oldGroupMetadata.groupInstanceId()
+                )
+            )
+        );
+    }
+
     /**
      * poll implementation using {@link ApplicationEventHandler}.
      *  1. Poll for background events. If there's a fetch response event, 
process the record and return it. If it is
@@ -713,18 +716,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 wakeupTrigger.maybeTriggerWakeup();
 
                 updateAssignmentMetadataIfNeeded(timer);
-                if (isGenerationKnownOrPartitionsUserAssigned()) {
-                    final Fetch<K, V> fetch = pollForFetches(timer);
-                    if (!fetch.isEmpty()) {
-                        if (fetch.records().isEmpty()) {
-                            log.trace("Returning empty records from `poll()` "
-                                + "since the consumer's position has advanced 
for at least one topic partition");
-                        }
-
-                        return interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        log.trace("Returning empty records from `poll()` "
+                            + "since the consumer's position has advanced for 
at least one topic partition");
                     }
-                } else {
-                    timer.update();
+
+                    return interceptors.onConsume(new 
ConsumerRecords<>(fetch.records()));
                 }
                 // We will wait for retryBackoffMs
             } while (timer.notExpired());
@@ -736,13 +735,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    private boolean isGenerationKnownOrPartitionsUserAssigned() {
-        if (subscriptions.hasAutoAssignedPartitions()) {
-            return groupMetadata.filter(g -> g.generationId() != 
JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent();
-        }
-        return true;
-    }
-
     /**
      * Commit offsets returned on the last {@link #poll(Duration) poll()} for 
all the subscribed list of topics and
      * partitions.
@@ -960,7 +952,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     private void maybeThrowInvalidGroupIdException() {
-        if (!groupMetadata.isPresent()) {
+        if (!groupMetadata.get().isPresent()) {
             throw new InvalidGroupIdException("To use the group management or 
offset commit APIs, you must " +
                 "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the 
consumer configuration.");
         }
@@ -1186,7 +1178,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             maybeThrowInvalidGroupIdException();
-            return groupMetadata.get();
+            return groupMetadata.get().get();
         } finally {
             release();
         }
@@ -1266,7 +1258,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * 3. if partition revocation completes successfully, send leave group
      */
     void prepareShutdown(final Timer timer, final AtomicReference<Throwable> 
firstException) {
-        if (!groupMetadata.isPresent())
+        if (!groupMetadata.get().isPresent())
             return;
         maybeAutoCommitSync(autoCommitEnabled, timer);
         applicationEventHandler.add(new CommitOnCloseEvent());
@@ -1463,7 +1455,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             fetchBuffer.retainAll(Collections.emptySet());
-            if (groupMetadata.isPresent()) {
+            if (groupMetadata.get().isPresent()) {
                 UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent();
                 applicationEventHandler.add(unsubscribeEvent);
                 log.info("Unsubscribing all topics or patterns and assigned 
partitions");
@@ -1475,7 +1467,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 } catch (TimeoutException e) {
                     log.error("Failed while waiting for the unsubscribe event 
to complete");
                 }
-                groupMetadata = 
initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty());
+                resetGroupMetadata();
             }
             subscriptions.unsubscribe();
         } finally {
@@ -1483,6 +1475,16 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
+    private void resetGroupMetadata() {
+        groupMetadata.updateAndGet(
+            oldGroupMetadataOptional -> oldGroupMetadataOptional
+                .map(oldGroupMetadata -> initializeConsumerGroupMetadata(
+                    oldGroupMetadata.groupId(),
+                    oldGroupMetadata.groupInstanceId()
+                ))
+        );
+    }
+
     @Override
     @Deprecated
     public ConsumerRecords<K, V> poll(final long timeoutMs) {
@@ -1604,7 +1606,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * according to config {@link CommonClientConfigs#GROUP_ID_CONFIG}
      */
     private boolean isCommittedOffsetsManagementEnabled() {
-        return groupMetadata.isPresent();
+        return groupMetadata.get().isPresent();
     }
 
     /**
@@ -1905,12 +1907,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private void maybeThrowFencedInstanceException() {
         if (offsetCommitCallbackInvoker.hasFencedException()) {
             String groupInstanceId = "unknown";
-            if (!groupMetadata.isPresent()) {
+            if (!groupMetadata.get().isPresent()) {
                 log.error("No group metadata found although a group ID was 
provided. This is a bug!");
-            } else if (!groupMetadata.get().groupInstanceId().isPresent()) {
+            } else if 
(!groupMetadata.get().get().groupInstanceId().isPresent()) {
                 log.error("No group instance ID found although the consumer is 
fenced. This is a bug!");
             } else {
-                groupInstanceId = groupMetadata.get().groupInstanceId().get();
+                groupInstanceId = 
groupMetadata.get().get().groupInstanceId().get();
             }
             throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " + groupInstanceId);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 826774a6a64..d2a7205d5f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -22,7 +22,6 @@ import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollRes
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -110,7 +109,6 @@ public class HeartbeatRequestManager implements 
RequestManager {
      * sending heartbeat until the next poll.
      */
     private final Timer pollTimer;
-    private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
 
     /**
      * Holding the heartbeat sensor to measure heartbeat timing and response 
latency
@@ -328,27 +326,11 @@ public class HeartbeatRequestManager implements 
RequestManager {
             heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
             heartbeatRequestState.resetTimer();
             membershipManager.onHeartbeatSuccess(response.data());
-            maybeSendGroupMetadataUpdateEvent();
             return;
         }
         onErrorResponse(response, currentTimeMs);
     }
 
-    private void maybeSendGroupMetadataUpdateEvent() {
-        if (previousGroupMetadataUpdateEvent == null ||
-            
!previousGroupMetadataUpdateEvent.memberId().equals(membershipManager.memberId())
 ||
-            previousGroupMetadataUpdateEvent.memberEpoch() != 
membershipManager.memberEpoch()) {
-
-            final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = 
new GroupMetadataUpdateEvent(
-                membershipManager.memberEpoch(),
-                previousGroupMetadataUpdateEvent != null && 
membershipManager.memberId() == null ?
-                    previousGroupMetadataUpdateEvent.memberId() : 
membershipManager.memberId()
-            );
-            this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
-            previousGroupMetadataUpdateEvent = currentGroupMetadataUpdateEvent;
-        }
-    }
-
     private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
                                  final long currentTimeMs) {
         Errors error = Errors.forCode(response.data().errorCode());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 35322fb51be..c2bdd3f8609 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -57,6 +57,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.unmodifiableList;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
@@ -1491,4 +1492,9 @@ public class MembershipManagerImpl implements 
MembershipManager {
         }
         return PollResult.EMPTY;
     }
+
+    // visible for testing
+    List<MemberStateListener> stateListeners() {
+        return unmodifiableList(stateUpdatesListeners);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 0b4c043d4a4..75d87432db6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -123,7 +123,8 @@ public class RequestManagers implements Closeable {
                                                      final 
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
                                                      final 
Optional<ClientTelemetryReporter> clientTelemetryReporter,
                                                      final Metrics metrics,
-                                                     final 
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker
+                                                     final 
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
+                                                     final MemberStateListener 
applicationThreadMemberStateListener
                                                      ) {
         return new CachedSupplier<RequestManagers>() {
             @Override
@@ -192,6 +193,7 @@ public class RequestManagers implements Closeable {
                             time,
                             metrics);
                     membershipManager.registerStateListener(commit);
+                    
membershipManager.registerStateListener(applicationThreadMemberStateListener);
                     heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
                             time,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9bc3fbebc30..4241482bcaa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -27,7 +27,7 @@ import java.util.Objects;
 public abstract class BackgroundEvent {
 
     public enum Type {
-        ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED, 
GROUP_METADATA_UPDATE
+        ERROR, CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED
     }
 
     private final Type type;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
deleted file mode 100644
index 001f5498183..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
-
-/**
- * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
- * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
- * information for the current state of the group member is managed on the 
consumer network thread and thus
- * requires this interplay between threads.
- */
-public class GroupMetadataUpdateEvent extends BackgroundEvent {
-
-    private final int memberEpoch;
-    private final String memberId;
-
-    public GroupMetadataUpdateEvent(final int memberEpoch, final String 
memberId) {
-        super(Type.GROUP_METADATA_UPDATE);
-        this.memberEpoch = memberEpoch;
-        this.memberId = memberId;
-    }
-
-    public int memberEpoch() {
-        return memberEpoch;
-    }
-
-    public String memberId() {
-        return memberId;
-    }
-
-    @Override
-    public String toStringBase() {
-        return super.toStringBase() +
-            ", memberEpoch=" + memberEpoch +
-            ", memberId='" + memberId + '\'';
-    }
-}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 35e742fbd0a..fe83a417897 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -42,7 +42,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
@@ -52,7 +51,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEven
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
-import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
@@ -80,6 +78,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 import java.time.Duration;
@@ -102,7 +101,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -117,10 +115,12 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON
 import static 
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -133,6 +133,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -164,19 +165,19 @@ public class AsyncKafkaConsumerTest {
     }
 
     private AsyncKafkaConsumer<String, String> newConsumer() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
         return newConsumer(props);
     }
 
     private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         return newConsumer(props);
     }
 
     @SuppressWarnings("UnusedReturnValue")
     private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
-        final Properties props = requiredConsumerPropertiesAndGroupId("");
+        final Properties props = requiredConsumerConfigAndGroupId("");
         return newConsumer(props);
     }
 
@@ -921,7 +922,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testInterceptorAutoCommitOnClose() {
-        Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+        Properties props = requiredConsumerConfigAndGroupId("test-id");
         props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
 
@@ -937,7 +938,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testInterceptorCommitSync() {
-        Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+        Properties props = requiredConsumerConfigAndGroupId("test-id");
         props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -952,7 +953,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testNoInterceptorCommitSyncFailed() {
-        Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+        Properties props = requiredConsumerConfigAndGroupId("test-id");
         props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -968,7 +969,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testInterceptorCommitAsync() {
-        Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+        Properties props = requiredConsumerConfigAndGroupId("test-id");
         props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -985,7 +986,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testNoInterceptorCommitAsyncFailed() {
-        Properties props = requiredConsumerPropertiesAndGroupId("test-id");
+        Properties props = requiredConsumerConfigAndGroupId("test-id");
         props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -1085,7 +1086,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
 
@@ -1102,7 +1103,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
         consumer = newConsumer(config);
 
         final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
@@ -1117,7 +1118,7 @@ public class AsyncKafkaConsumerTest {
     public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
         final String groupId = "consumerGroupA";
         final String groupInstanceId = "groupInstanceId1";
-        final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+        final Properties props = requiredConsumerConfigAndGroupId(groupId);
         props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
@@ -1130,164 +1131,65 @@ public class AsyncKafkaConsumerTest {
         assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
     }
 
-    @Test
-    public void testGroupMetadataUpdateSingleCall() {
+    private MemberStateListener captureGroupMetadataUpdateListener(final 
MockedStatic<RequestManagers> requestManagers) {
+        ArgumentCaptor<MemberStateListener> 
applicationThreadMemberStateListener = 
ArgumentCaptor.forClass(MemberStateListener.class);
+        requestManagers.verify(() -> RequestManagers.supplier(
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            any(),
+            applicationThreadMemberStateListener.capture()
+        ));
+        return applicationThreadMemberStateListener.getValue();
+    }
+
+    @Test
+    public void testGroupMetadataUpdate() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
-        consumer = newConsumer(config);
-
-        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-        completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
-
-        final int generation = 1;
-        final String memberId = "newMemberId";
-        final ConsumerGroupMetadata expectedGroupMetadata = new 
ConsumerGroupMetadata(
-            groupId,
-            generation,
-            memberId,
-            Optional.empty()
-        );
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            generation,
-            memberId
-        );
-        backgroundEventQueue.add(groupMetadataUpdateEvent);
-        consumer.assign(singletonList(new TopicPartition("topic", 0)));
-        consumer.poll(Duration.ZERO);
-
-        final ConsumerGroupMetadata actualGroupMetadata = 
consumer.groupMetadata();
-
-        assertEquals(expectedGroupMetadata, actualGroupMetadata);
-
-        final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = 
consumer.groupMetadata();
-
-        assertEquals(expectedGroupMetadata, 
secondActualGroupMetadataWithoutUpdate);
-    }
-
-    @Test
-    public void 
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithTopics()
 {
-        
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
-            consumer.subscribe(singletonList("topic"));
-        });
-    }
-
-    @Test
-    public void 
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsedWithPattern()
 {
-        
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(() -> {
-            when(metadata.fetch()).thenReturn(Cluster.empty());
-            consumer.subscribe(Pattern.compile("topic"));
-        });
-    }
-
-    private void 
testPollNotReturningRecordsIfGenerationUnknownAndGroupManagementIsUsed(final 
Runnable subscription) {
-        final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
-        consumer = newConsumer(config);
-        subscription.run();
-
-        consumer.poll(Duration.ZERO);
-
-        verify(fetchCollector, never()).collectFetch(any(FetchBuffer.class));
-    }
-
-    @Test
-    public void 
testPollReturningRecordsIfGroupIdSetAndGroupManagementIsNotUsed() {
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
-        
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
-    }
-
-    @Test
-    public void 
testPollReturningRecordsIfGroupIdNotSetAndGroupManagementIsNotUsed() {
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerProperties());
-        
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(config);
-    }
-
-    private void 
testPollReturningRecordsIfGroupMetadataHasUnknownGenerationAndGroupManagementIsNotUsed(final
 ConsumerConfig config) {
-        final String topic = "topic";
-        final TopicPartition topicPartition = new TopicPartition(topic, 0);
-        consumer = newConsumer(config);
-        consumer.assign(singletonList(topicPartition));
-        final List<ConsumerRecord<String, String>> records = singletonList(
-            new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
-        );
-        when(fetchCollector.collectFetch(any(FetchBuffer.class)))
-            .thenReturn(Fetch.forPartition(topicPartition, records, true));
-        completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
-
-        consumer.poll(Duration.ZERO);
-
-        verify(fetchCollector).collectFetch(any(FetchBuffer.class));
-    }
-
-    @Test
-    public void 
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithTopics() {
-        final String topic = "topic";
-        testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
-            topic,
-            () -> {
-                consumer.subscribe(singletonList(topic));
-            });
-    }
-
-    @Test
-    public void 
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsedWithPattern() {
-        final String topic = "topic";
-        testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(
-            topic,
-            () -> {
-                when(metadata.fetch()).thenReturn(Cluster.empty());
-                consumer.subscribe(Pattern.compile(topic));
-            });
-    }
-
-    private void 
testPollReturningRecordIfGenerationKnownAndGroupManagementIsUsed(final String 
topic,
-                                                                               
   final Runnable subscription) {
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA"));
-        final int generation = 1;
-        final String memberId = "newMemberId";
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            generation,
-            memberId
-        );
-        backgroundEventQueue.add(groupMetadataUpdateEvent);
-        final TopicPartition topicPartition = new TopicPartition(topic, 0);
-        final List<ConsumerRecord<String, String>> records = singletonList(
-            new ConsumerRecord<>(topic, 0, 2, "key1", "value1")
-        );
-        when(fetchCollector.collectFetch(any(FetchBuffer.class)))
-            .thenReturn(Fetch.forPartition(topicPartition, records, true));
-        consumer = newConsumer(config);
-        subscription.run();
-
-        consumer.poll(Duration.ZERO);
-
-        verify(fetchCollector).collectFetch(any(FetchBuffer.class));
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
+        try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
+            consumer = newConsumer(config);
+            final ConsumerGroupMetadata oldGroupMetadata = 
consumer.groupMetadata();
+            final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
+            final int expectedMemberEpoch = 42;
+            final String expectedMemberId = "memberId";
+            groupMetadataUpdateListener.onMemberEpochUpdated(
+                Optional.of(expectedMemberEpoch),
+                Optional.of(expectedMemberId)
+            );
+            final ConsumerGroupMetadata newGroupMetadata = 
consumer.groupMetadata();
+            assertEquals(oldGroupMetadata.groupId(), 
newGroupMetadata.groupId());
+            assertEquals(expectedMemberId, newGroupMetadata.memberId());
+            assertEquals(expectedMemberEpoch, newGroupMetadata.generationId());
+            assertEquals(oldGroupMetadata.groupInstanceId(), 
newGroupMetadata.groupInstanceId());
+        }
     }
 
     @Test
     public void testGroupMetadataIsResetAfterUnsubscribe() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
-        consumer = newConsumer(config);
-        consumer.subscribe(singletonList("topic"));
-        final int generation = 1;
-        final String memberId = "newMemberId";
-        final ConsumerGroupMetadata groupMetadataAfterSubscription = new 
ConsumerGroupMetadata(
-            groupId,
-            generation,
-            memberId,
-            Optional.empty()
-        );
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            generation,
-            memberId
-        );
-        backgroundEventQueue.add(groupMetadataUpdateEvent);
-        
when(fetchCollector.collectFetch(any(FetchBuffer.class))).thenReturn(Fetch.empty());
-        consumer.poll(Duration.ZERO);
-
-        assertEquals(groupMetadataAfterSubscription, consumer.groupMetadata());
-
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
+        try (final MockedStatic<RequestManagers> requestManagers = 
mockStatic(RequestManagers.class)) {
+            consumer = newConsumer(config);
+            final MemberStateListener groupMetadataUpdateListener = 
captureGroupMetadataUpdateListener(requestManagers);
+            consumer.subscribe(singletonList("topic"));
+            final int memberEpoch = 42;
+            final String memberId = "memberId";
+            
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), 
Optional.of(memberId));
+            final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
+            assertNotEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+            assertNotEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+        }
         completeUnsubscribeApplicationEventSuccessfully();
 
         consumer.unsubscribe();
@@ -1298,7 +1200,6 @@ public class AsyncKafkaConsumerTest {
             JoinGroupRequest.UNKNOWN_MEMBER_ID,
             Optional.empty()
         );
-
         assertEquals(groupMetadataAfterUnsubscription, 
consumer.groupMetadata());
     }
 
@@ -1379,7 +1280,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testBackgroundError() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
         consumer = newConsumer(config);
 
         final KafkaException expectedException = new KafkaException("Nobody 
expects the Spanish Inquisition");
@@ -1394,7 +1295,7 @@ public class AsyncKafkaConsumerTest {
     @Test
     public void testMultipleBackgroundErrors() {
         final String groupId = "consumerGroupA";
-        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerConfigAndGroupId(groupId));
         consumer = newConsumer(config);
 
         final KafkaException expectedException1 = new KafkaException("Nobody 
expects the Spanish Inquisition");
@@ -1412,7 +1313,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
         final ConsumerConfig config = new ConsumerConfig(props);
         consumer = newConsumer(config);
@@ -1422,7 +1323,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupRemoteAssignorUnusedInGenericProtocol() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
         props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
@@ -1434,7 +1335,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupRemoteAssignorUsedInConsumerProtocol() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
         props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor");
@@ -1446,7 +1347,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupIdNull() {
-        final Properties props = requiredConsumerProperties();
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
         props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
         final ConsumerConfig config = new ConsumerConfig(props);
@@ -1458,7 +1359,7 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupIdNotNullAndValid() {
-        final Properties props = 
requiredConsumerPropertiesAndGroupId("consumerGroupA");
+        final Properties props = 
requiredConsumerConfigAndGroupId("consumerGroupA");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
         props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
         final ConsumerConfig config = new ConsumerConfig(props);
@@ -1492,7 +1393,6 @@ public class AsyncKafkaConsumerTest {
         final TopicPartition tp = new TopicPartition("topic", 0);
         final List<ConsumerRecord<String, String>> records = singletonList(
                 new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
-        backgroundEventQueue.add(new GroupMetadataUpdateEvent(1, "memberId"));
         doAnswer(invocation -> Fetch.forPartition(tp, records, true))
                 .when(fetchCollector)
                 .collectFetch(Mockito.any(FetchBuffer.class));
@@ -1503,7 +1403,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     private void testInvalidGroupId(final String groupId) {
-        final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+        final Properties props = requiredConsumerConfigAndGroupId(groupId);
         final ConsumerConfig config = new ConsumerConfig(props);
 
         final Exception exception = assertThrows(
@@ -1514,20 +1414,12 @@ public class AsyncKafkaConsumerTest {
         assertEquals("Failed to construct kafka consumer", 
exception.getMessage());
     }
 
-    private Properties requiredConsumerPropertiesAndGroupId(final String 
groupId) {
-        final Properties props = requiredConsumerProperties();
+    private Properties requiredConsumerConfigAndGroupId(final String groupId) {
+        final Properties props = requiredConsumerConfig();
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         return props;
     }
 
-    private Properties requiredConsumerProperties() {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
-        return props;
-    }
-
     private void 
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean 
committedOffsetsEnabled) {
         completeFetchedCommittedOffsetApplicationEventExceptionally(new 
TimeoutException());
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
@@ -1588,13 +1480,6 @@ public class AsyncKafkaConsumerTest {
             new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
         );
 
-        final int generation = 1;
-        final String memberId = "newMemberId";
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            generation,
-            memberId
-        );
-        backgroundEventQueue.add(groupMetadataUpdateEvent);
         // On the first iteration, return no data; on the second, return two 
records
         doAnswer(invocation -> {
             // Mock the subscription being assigned as the first fetch is 
collected
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 90cfb90bb1b..8e05e505be4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
@@ -325,91 +324,6 @@ public class HeartbeatRequestManagerTest {
         assertEquals(DEFAULT_REMOTE_ASSIGNOR, 
heartbeatRequest.data().serverAssignor());
     }
 
-    @Test
-    public void testConsumerGroupMetadataFirstUpdate() {
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-        assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
-        assertEquals(memberId, groupMetadataUpdateEvent.memberId());
-    }
-
-    @Test
-    public void testConsumerGroupMetadataUpdateWithSameUpdate() {
-        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
-        time.sleep(2000);
-        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
-
-        assertEquals(1, result.unsentRequests.size());
-        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
-        ClientResponse responseWithSameUpdate = 
createHeartbeatResponse(request, Errors.NONE);
-        request.handler().onComplete(responseWithSameUpdate);
-        assertEquals(0, backgroundEventQueue.size());
-    }
-
-    @Test
-    public void 
testConsumerGroupMetadataUpdateWithMemberIdNullButMemberEpochUpdated() {
-        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
-        time.sleep(2000);
-        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
-
-        assertEquals(1, result.unsentRequests.size());
-        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
-        final int updatedMemberEpoch = 2;
-        ClientResponse responseWithMemberEpochUpdate = 
createHeartbeatResponseWithMemberIdNull(
-            request,
-            Errors.NONE,
-            updatedMemberEpoch
-        );
-        request.handler().onComplete(responseWithMemberEpochUpdate);
-        assertEquals(1, backgroundEventQueue.size());
-        final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
-        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
-        assertEquals(updatedMemberEpoch, 
groupMetadataUpdateEvent.memberEpoch());
-        assertEquals(memberId, groupMetadataUpdateEvent.memberId());
-    }
-
-    @Test
-    public void 
testConsumerGroupMetadataUpdateWithMemberIdUpdatedAndMemberEpochSame() {
-        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
-        time.sleep(2000);
-        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
-
-        assertEquals(1, result.unsentRequests.size());
-        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
-        final String updatedMemberId = "updatedMemberId";
-        ClientResponse responseWithMemberIdUpdate = createHeartbeatResponse(
-            request,
-            Errors.NONE,
-            updatedMemberId,
-            memberEpoch
-        );
-        request.handler().onComplete(responseWithMemberIdUpdate);
-        assertEquals(1, backgroundEventQueue.size());
-        final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
-        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
-        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
-        assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
-        assertEquals(updatedMemberId, groupMetadataUpdateEvent.memberId());
-    }
-
-    private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String 
memberId, final int memberEpoch) {
-        resetWithZeroHeartbeatInterval(Optional.empty());
-        mockStableMember();
-        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
-        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
-        assertEquals(1, result.unsentRequests.size());
-        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
-        ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE, memberId, memberEpoch);
-        request.handler().onComplete(firstResponse);
-        assertEquals(1, backgroundEventQueue.size());
-        final BackgroundEvent event = backgroundEventQueue.poll();
-        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
-        return (GroupMetadataUpdateEvent) event;
-    }
-
     @ParameterizedTest
     @MethodSource("errorProvider")
     public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
@@ -430,7 +344,6 @@ public class HeartbeatRequestManagerTest {
 
         switch (error) {
             case NONE:
-                
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
                 verify(membershipManager, 
times(2)).onHeartbeatSuccess(mockResponse.data());
                 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
                 break;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
new file mode 100644
index 00000000000..640c7e98e42
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestManagersTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class RequestManagersTest {
+
+    @Test
+    public void testMemberStateListenerRegistered() {
+
+        final MemberStateListener listener = (memberEpoch, memberId) -> { };
+
+        final Properties properties = requiredConsumerConfig();
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"consumerGroup");
+        final ConsumerConfig config = new ConsumerConfig(properties);
+        final GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(
+            config,
+            GroupRebalanceConfig.ProtocolType.CONSUMER
+        );
+        final RequestManagers requestManagers = RequestManagers.supplier(
+            new MockTime(),
+            new LogContext(),
+            mock(BackgroundEventHandler.class),
+            mock(ConsumerMetadata.class),
+            mock(SubscriptionState.class),
+            mock(FetchBuffer.class),
+            config,
+            groupRebalanceConfig,
+            mock(ApiVersions.class),
+            mock(FetchMetricsManager.class),
+            () -> mock(NetworkClientDelegate.class),
+            Optional.empty(),
+            new Metrics(),
+            mock(OffsetCommitCallbackInvoker.class),
+            listener
+        ).get();
+        requestManagers.membershipManager.ifPresent(
+            membershipManager -> assertTrue(((MembershipManagerImpl) 
membershipManager).stateListeners().contains(listener))
+        );
+    }
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index db86558f7d0..3a7dcfc9305 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -37,6 +37,7 @@ import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ByteBufferChannel;
 import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -279,6 +280,14 @@ public class TestUtils {
         return producerConfig(bootstrapServers, keySerializer, 
valueSerializer, new Properties());
     }
 
+    public static Properties requiredConsumerConfig() {
+        final Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9091");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        return consumerConfig;
+    }
+
     public static Properties consumerConfig(final String bootstrapServers,
                                             final String groupId,
                                             final Class<?> keyDeserializer,

Reply via email to