This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6df192b6cb1 KAFKA-15281: Implement the groupMetadata Consumer API 
(#14879)
6df192b6cb1 is described below

commit 6df192b6cb1397a6e6173835bbbd8a3acb7e3988
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Dec 6 08:46:57 2023 +0100

    KAFKA-15281: Implement the groupMetadata Consumer API (#14879)
    
    Implements the groupMetadata() API on the async consumer.
    
    Reviewers: Kirk True <[email protected]>, Andrew Schofield 
<[email protected]>, Lucas Brutschy <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  94 ++++++++++++-
 .../internals/HeartbeatRequestManager.java         |  19 +++
 .../consumer/internals/events/BackgroundEvent.java |   3 +-
 .../internals/events/BackgroundEventProcessor.java |  80 -----------
 .../internals/events/GroupMetadataUpdateEvent.java |  79 +++++++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java | 148 ++++++++++++++++++++-
 .../consumer/internals/ConsumerTestBuilder.java    |   4 -
 .../internals/HeartbeatRequestManagerTest.java     | 122 ++++++++++++++++-
 .../events/BackgroundEventHandlerTest.java         | 141 --------------------
 9 files changed, 452 insertions(+), 238 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 7733c45bc4a..6c67f0bffdc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -41,8 +41,10 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
@@ -134,6 +136,77 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private static final long NO_CURRENT_THREAD = -1L;
 
+    /**
+     * An {@link 
org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is 
created and executes in the
+     * application thread for the purpose of processing {@link BackgroundEvent 
background events} generated by the
+     * {@link ConsumerNetworkThread network thread}.
+     * Those events are generally of two types:
+     *
+     * <ul>
+     *     <li>Errors that occur in the network thread that need to be 
propagated to the application thread</li>
+     *     <li>{@link ConsumerRebalanceListener} callbacks that are to be 
executed on the application thread</li>
+     * </ul>
+     */
+    public class BackgroundEventProcessor extends 
EventProcessor<BackgroundEvent> {
+
+        public BackgroundEventProcessor(final LogContext logContext,
+                                        final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
+            super(logContext, backgroundEventQueue);
+        }
+
+        /**
+         * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
+         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * could occur when processing the events. In such cases, the 
processor will take a reference to the first
+         * error, continue to process the remaining events, and then throw the 
first error that occurred.
+         */
+        @Override
+        public void process() {
+            AtomicReference<RuntimeException> firstError = new 
AtomicReference<>();
+            process((event, error) -> firstError.compareAndSet(null, error));
+
+            if (firstError.get() != null) {
+                throw firstError.get();
+            }
+        }
+
+        @Override
+        public void process(final BackgroundEvent event) {
+            switch (event.type()) {
+                case ERROR:
+                    process((ErrorBackgroundEvent) event);
+                    break;
+                case GROUP_METADATA_UPDATE:
+                    process((GroupMetadataUpdateEvent) event);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Background event type 
" + event.type() + " was not expected");
+
+            }
+        }
+
+        @Override
+        protected Class<BackgroundEvent> getEventClass() {
+            return BackgroundEvent.class;
+        }
+
+        private void process(final ErrorBackgroundEvent event) {
+            throw event.error();
+        }
+
+        private void process(final GroupMetadataUpdateEvent event) {
+            if (AsyncKafkaConsumer.this.groupMetadata.isPresent()) {
+                final ConsumerGroupMetadata currentGroupMetadata = 
AsyncKafkaConsumer.this.groupMetadata.get();
+                AsyncKafkaConsumer.this.groupMetadata = Optional.of(new 
ConsumerGroupMetadata(
+                    currentGroupMetadata.groupId(),
+                    event.memberEpoch(),
+                    event.memberId(),
+                    currentGroupMetadata.groupInstanceId()
+                ));
+            }
+        }
+    }
+
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private Optional<ConsumerGroupMetadata> groupMetadata;
@@ -177,6 +250,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     AsyncKafkaConsumer(final ConsumerConfig config,
                        final Deserializer<K> keyDeserializer,
                        final Deserializer<V> valueDeserializer) {
+        this(config, keyDeserializer, valueDeserializer, new 
LinkedBlockingQueue<>());
+    }
+
+    AsyncKafkaConsumer(final ConsumerConfig config,
+                       final Deserializer<K> keyDeserializer,
+                       final Deserializer<V> valueDeserializer,
+                       final LinkedBlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
         try {
             GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(
                 config,
@@ -212,7 +292,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-            final BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
 
             // This FetchBuffer is shared between the application and network 
threads.
             this.fetchBuffer = new FetchBuffer(logContext);
@@ -413,7 +492,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                                                                     final 
Optional<String> groupInstanceId) {
         if (groupId != null) {
             if (groupId.isEmpty()) {
-                throw new InvalidGroupIdException("The configured group.id 
should not be an empty string or whitespace.");
+                throw new InvalidGroupIdException("The configured " + 
ConsumerConfig.GROUP_ID_CONFIG
+                    + " should not be an empty string or whitespace.");
             } else {
                 return Optional.of(new ConsumerGroupMetadata(
                     groupId,
@@ -889,7 +969,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public ConsumerGroupMetadata groupMetadata() {
-        throw new KafkaException("method not implemented");
+        acquireAndEnsureOpen();
+        try {
+            maybeThrowInvalidGroupIdException();
+            return groupMetadata.get();
+        } finally {
+            release();
+        }
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index e724735f535..3632098a09b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -103,6 +104,8 @@ public class HeartbeatRequestManager implements 
RequestManager {
      */
     private final BackgroundEventHandler backgroundEventHandler;
 
+    private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
+
     public HeartbeatRequestManager(
         final LogContext logContext,
         final Time time,
@@ -232,11 +235,27 @@ public class HeartbeatRequestManager implements 
RequestManager {
             this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
             this.heartbeatRequestState.resetTimer();
             
this.membershipManager.onHeartbeatResponseReceived(response.data());
+            maybeSendGroupMetadataUpdateEvent();
             return;
         }
         onErrorResponse(response, currentTimeMs);
     }
 
+    private void maybeSendGroupMetadataUpdateEvent() {
+        if (previousGroupMetadataUpdateEvent == null ||
+            
!previousGroupMetadataUpdateEvent.memberId().equals(membershipManager.memberId())
 ||
+            previousGroupMetadataUpdateEvent.memberEpoch() != 
membershipManager.memberEpoch()) {
+
+            final GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = 
new GroupMetadataUpdateEvent(
+                membershipManager.memberEpoch(),
+                previousGroupMetadataUpdateEvent != null && 
membershipManager.memberId() == null ?
+                    previousGroupMetadataUpdateEvent.memberId() : 
membershipManager.memberId()
+            );
+            this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
+            previousGroupMetadataUpdateEvent = currentGroupMetadataUpdateEvent;
+        }
+    }
+
     private void onErrorResponse(final ConsumerGroupHeartbeatResponse response,
                                  final long currentTimeMs) {
         Errors error = Errors.forCode(response.data().errorCode());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 9ce43be7802..0e44fe032fe 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -26,7 +26,8 @@ import java.util.Objects;
 public abstract class BackgroundEvent {
 
     public enum Type {
-        ERROR
+        ERROR,
+        GROUP_METADATA_UPDATE
     }
 
     protected final Type type;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
deleted file mode 100644
index fbc28d51bfd..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventProcessor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.LogContext;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * An {@link EventProcessor} that is created and executes in the application 
thread for the purpose of processing
- * {@link BackgroundEvent background events} generated by the {@link 
ConsumerNetworkThread network thread}.
- * Those events are generally of two types:
- *
- * <ul>
- *     <li>Errors that occur in the network thread that need to be propagated 
to the application thread</li>
- *     <li>{@link ConsumerRebalanceListener} callbacks that are to be executed 
on the application thread</li>
- * </ul>
- */
-public class BackgroundEventProcessor extends EventProcessor<BackgroundEvent> {
-
-    public BackgroundEventProcessor(final LogContext logContext,
-                                    final BlockingQueue<BackgroundEvent> 
backgroundEventQueue) {
-        super(logContext, backgroundEventQueue);
-    }
-
-    /**
-     * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
-     * It is possible that {@link ErrorBackgroundEvent an error} could occur 
when processing the events.
-     * In such cases, the processor will take a reference to the first error, 
continue to process the
-     * remaining events, and then throw the first error that occurred.
-     */
-    @Override
-    public void process() {
-        AtomicReference<KafkaException> firstError = new AtomicReference<>();
-        process((event, error) -> firstError.compareAndSet(null, error));
-
-        if (firstError.get() != null)
-            throw firstError.get();
-    }
-
-    @Override
-    public void process(final BackgroundEvent event) {
-        switch (event.type()) {
-            case ERROR:
-                process((ErrorBackgroundEvent) event);
-                break;
-            default:
-                throw new IllegalArgumentException("Background event type " + 
event.type() + " was not expected");
-
-        }
-    }
-
-    @Override
-    protected Class<BackgroundEvent> getEventClass() {
-        return BackgroundEvent.class;
-    }
-
-    private void process(final ErrorBackgroundEvent event) {
-        throw event.error();
-    }
-
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
new file mode 100644
index 00000000000..120e6717242
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+
+import java.util.Objects;
+
+/**
+ * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
+ * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
+ * information for the current state of the group member is managed on the 
consumer network thread and thus
+ * requires this interplay between threads.
+ */
+public class GroupMetadataUpdateEvent extends BackgroundEvent {
+
+    final private int memberEpoch;
+    final private String memberId;
+
+    public GroupMetadataUpdateEvent(final int memberEpoch,
+                                    final String memberId) {
+        super(Type.GROUP_METADATA_UPDATE);
+        this.memberEpoch = memberEpoch;
+        this.memberId = memberId;
+    }
+
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    public String memberId() {
+        return memberId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        GroupMetadataUpdateEvent that = (GroupMetadataUpdateEvent) o;
+        return memberEpoch == that.memberEpoch &&
+            Objects.equals(memberId, that.memberId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), memberEpoch, memberId);
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() +
+            ", memberEpoch=" + memberEpoch +
+            ", memberId='" + memberId + '\'';
+    }
+
+    @Override
+    public String toString() {
+        return "GroupMetadataUpdateEvent{" +
+            toStringBase() +
+            '}';
+    }
+
+}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 6015994bfc8..12b833109ec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -27,7 +28,10 @@ import 
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
@@ -48,6 +52,7 @@ import 
org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.RequestHeader;
@@ -83,6 +88,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -760,6 +766,132 @@ public class AsyncKafkaConsumerTest {
         assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
     }
 
+    @Test
+    public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
+        final Properties props = requiredConsumerProperties();
+        final ConsumerConfig config = new ConsumerConfig(props);
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
+
+            
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+            
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+            final Throwable exception = 
assertThrows(InvalidGroupIdException.class, consumer::groupMetadata);
+            assertEquals(
+                "To use the group management or offset commit APIs, you must " 
+
+                    "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in 
the consumer configuration.",
+                exception.getMessage()
+            );
+        }
+    }
+
+    @Test
+    public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
+
+            final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
+
+            assertEquals(groupId, groupMetadata.groupId());
+            assertEquals(Optional.empty(), groupMetadata.groupInstanceId());
+            assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+            assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+        }
+    }
+
+    @Test
+    public void 
testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
+        final String groupId = "consumerGroupA";
+        final String groupInstanceId = "groupInstanceId1";
+        final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
+        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+        final ConsumerConfig config = new ConsumerConfig(props);
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer())) {
+
+            final ConsumerGroupMetadata groupMetadata = 
consumer.groupMetadata();
+
+            assertEquals(groupId, groupMetadata.groupId());
+            assertEquals(Optional.of(groupInstanceId), 
groupMetadata.groupInstanceId());
+            assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, 
groupMetadata.generationId());
+            assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupMetadata.memberId());
+        }
+    }
+
+    @Test
+    public void testGroupMetadataUpdateSingleCall() {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer(), backgroundEventQueue)) {
+            final int generation = 1;
+            final String memberId = "newMemberId";
+            final ConsumerGroupMetadata expectedGroupMetadata = new 
ConsumerGroupMetadata(
+                groupId,
+                generation,
+                memberId,
+                Optional.empty()
+            );
+            final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+                generation,
+                memberId
+            );
+            backgroundEventQueue.add(groupMetadataUpdateEvent);
+            consumer.assign(singletonList(new TopicPartition("topic", 0)));
+            consumer.poll(Duration.ZERO);
+
+            final ConsumerGroupMetadata actualGroupMetadata = 
consumer.groupMetadata();
+
+            assertEquals(expectedGroupMetadata, actualGroupMetadata);
+
+            final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate 
= consumer.groupMetadata();
+
+            assertEquals(expectedGroupMetadata, 
secondActualGroupMetadataWithoutUpdate);
+        }
+    }
+
+    @Test
+    public void testBackgroundError() {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer(), backgroundEventQueue)) {
+            final KafkaException expectedException = new 
KafkaException("Nobody expects the Spanish Inquisition");
+            final ErrorBackgroundEvent errorBackgroundEvent = new 
ErrorBackgroundEvent(expectedException);
+            backgroundEventQueue.add(errorBackgroundEvent);
+            consumer.assign(singletonList(new TopicPartition("topic", 0)));
+
+            final KafkaException exception = 
assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
+
+            assertEquals(expectedException.getMessage(), 
exception.getMessage());
+        }
+    }
+
+    @Test
+    public void testMultipleBackgroundErrors() {
+        final String groupId = "consumerGroupA";
+        final ConsumerConfig config = new 
ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId));
+        final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
+        try (final AsyncKafkaConsumer<String, String> consumer =
+            new AsyncKafkaConsumer<>(config, new StringDeserializer(), new 
StringDeserializer(), backgroundEventQueue)) {
+            final KafkaException expectedException1 = new 
KafkaException("Nobody expects the Spanish Inquisition");
+            final ErrorBackgroundEvent errorBackgroundEvent1 = new 
ErrorBackgroundEvent(expectedException1);
+            backgroundEventQueue.add(errorBackgroundEvent1);
+            final KafkaException expectedException2 = new 
KafkaException("Spam, Spam, Spam");
+            final ErrorBackgroundEvent errorBackgroundEvent2 = new 
ErrorBackgroundEvent(expectedException2);
+            backgroundEventQueue.add(errorBackgroundEvent2);
+            consumer.assign(singletonList(new TopicPartition("topic", 0)));
+
+            final KafkaException exception = 
assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
+
+            assertEquals(expectedException1.getMessage(), 
exception.getMessage());
+            assertTrue(backgroundEventQueue.isEmpty());
+        }
+    }
+
     @Test
     public void testGroupIdNull() {
         final Properties props = requiredConsumerProperties();
@@ -767,7 +899,7 @@ public class AsyncKafkaConsumerTest {
         props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
         final ConsumerConfig config = new ConsumerConfig(props);
 
-        try (AsyncKafkaConsumer<String, String> consumer =
+        try (final AsyncKafkaConsumer<String, String> consumer =
                  new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
             
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
             
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
@@ -778,13 +910,12 @@ public class AsyncKafkaConsumerTest {
 
     @Test
     public void testGroupIdNotNullAndValid() {
-        final Properties props = requiredConsumerProperties();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
+        final Properties props = 
requiredConsumerPropertiesAndGroupId("consumerGroupA");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
         props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
         final ConsumerConfig config = new ConsumerConfig(props);
 
-        try (AsyncKafkaConsumer<String, String> consumer =
+        try (final AsyncKafkaConsumer<String, String> consumer =
                  new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
             
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
             
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
@@ -804,8 +935,7 @@ public class AsyncKafkaConsumerTest {
     }
 
     private void testInvalidGroupId(final String groupId) {
-        final Properties props = requiredConsumerProperties();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        final Properties props = requiredConsumerPropertiesAndGroupId(groupId);
         final ConsumerConfig config = new ConsumerConfig(props);
 
         final Exception exception = assertThrows(
@@ -816,6 +946,12 @@ public class AsyncKafkaConsumerTest {
         assertEquals("Failed to construct kafka consumer", 
exception.getMessage());
     }
 
+    private Properties requiredConsumerPropertiesAndGroupId(final String 
groupId) {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        return props;
+    }
+
     private Properties requiredConsumerProperties() {
         final Properties props = new Properties();
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 254a39f3944..53917f6ff17 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -27,7 +27,6 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -93,7 +92,6 @@ public class ConsumerTestBuilder implements Closeable {
     final FetchRequestManager fetchRequestManager;
     final RequestManagers requestManagers;
     public final ApplicationEventProcessor applicationEventProcessor;
-    public final BackgroundEventProcessor backgroundEventProcessor;
     public final BackgroundEventHandler backgroundEventHandler;
     final MockClient client;
     final Optional<GroupInformation> groupInfo;
@@ -265,14 +263,12 @@ public class ConsumerTestBuilder implements Closeable {
                 requestManagers,
                 metadata)
         );
-        this.backgroundEventProcessor = spy(new 
BackgroundEventProcessor(logContext, backgroundEventQueue));
     }
 
     @Override
     public void close() {
         closeQuietly(requestManagers, RequestManagers.class.getSimpleName());
         closeQuietly(applicationEventProcessor, 
ApplicationEventProcessor.class.getSimpleName());
-        closeQuietly(backgroundEventProcessor, 
BackgroundEventProcessor.class.getSimpleName());
     }
 
     public static class ConsumerNetworkThreadTestBuilder extends 
ConsumerTestBuilder {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 7bdee9650a8..b584a5fde5c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
@@ -45,6 +47,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
@@ -74,6 +77,7 @@ public class HeartbeatRequestManagerTest {
     private final String memberId = "member-id";
     private final int memberEpoch = 1;
     private BackgroundEventHandler backgroundEventHandler;
+    private BlockingQueue<BackgroundEvent> backgroundEventQueue;
 
     @BeforeEach
     public void setUp() {
@@ -88,6 +92,7 @@ public class HeartbeatRequestManagerTest {
         heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
         heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
         backgroundEventHandler = testBuilder.backgroundEventHandler;
+        backgroundEventQueue = testBuilder.backgroundEventQueue;
         subscriptions = testBuilder.subscriptions;
         membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
 
@@ -292,6 +297,101 @@ public class HeartbeatRequestManagerTest {
         assertNull(heartbeatRequest.data().subscribedTopicRegex());
     }
 
+    @Test
+    public void testConsumerGroupMetadataFirstUpdate() {
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+            memberEpoch,
+            memberId
+        );
+        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+    }
+
+    @Test
+    public void testConsumerGroupMetadataUpdateWithSameUpdate() {
+        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+        time.sleep(2000);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+        ClientResponse responseWithSameUpdate = 
createHeartbeatResponse(request, Errors.NONE);
+        request.handler().onComplete(responseWithSameUpdate);
+        assertEquals(0, backgroundEventQueue.size());
+    }
+
+    @Test
+    public void 
testConsumerGroupMetadataUpdateWithMemberIdNullButMemberEpochUpdated() {
+        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+        time.sleep(2000);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+        final int updatedMemberEpoch = 2;
+        ClientResponse responseWithMemberEpochUpdate = 
createHeartbeatResponseWithMemberIdNull(
+            request,
+            Errors.NONE,
+            updatedMemberEpoch
+        );
+        request.handler().onComplete(responseWithMemberEpochUpdate);
+        assertEquals(1, backgroundEventQueue.size());
+        final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
+        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
+        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+            updatedMemberEpoch,
+            memberId
+        );
+        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+    }
+
+    @Test
+    public void 
testConsumerGroupMetadataUpdateWithMemberIdUpdatedAndMemberEpochSame() {
+        makeFirstGroupMetadataUpdate(memberId, memberEpoch);
+
+        time.sleep(2000);
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+        final String updatedMemberId = "updatedMemberId";
+        ClientResponse responseWithMemberIdUpdate = createHeartbeatResponse(
+            request,
+            Errors.NONE,
+            updatedMemberId,
+            memberEpoch
+        );
+        request.handler().onComplete(responseWithMemberIdUpdate);
+        assertEquals(1, backgroundEventQueue.size());
+        final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
+        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
+        final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
+        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
+            memberEpoch,
+            updatedMemberId
+        );
+        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+    }
+
+    private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String 
memberId, final int memberEpoch) {
+        resetWithZeroHeartbeatInterval(Optional.empty());
+        mockStableMember();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest request = 
result.unsentRequests.get(0);
+        ClientResponse firstResponse = createHeartbeatResponse(request, 
Errors.NONE, memberId, memberEpoch);
+        request.handler().onComplete(firstResponse);
+        assertEquals(1, backgroundEventQueue.size());
+        final BackgroundEvent event = backgroundEventQueue.poll();
+        assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, event.type());
+        return (GroupMetadataUpdateEvent) event;
+    }
+
     @ParameterizedTest
     @MethodSource("errorProvider")
     public void testHeartbeatResponseOnErrorHandling(final Errors error, final 
boolean isFatal) {
@@ -312,7 +412,7 @@ public class HeartbeatRequestManagerTest {
 
         switch (error) {
             case NONE:
-                verify(backgroundEventHandler, never()).add(any());
+                
verify(backgroundEventHandler).add(any(GroupMetadataUpdateEvent.class));
                 verify(membershipManager, 
times(2)).onHeartbeatResponseReceived(mockResponse.data());
                 assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
                 break;
@@ -450,7 +550,25 @@ public class HeartbeatRequestManagerTest {
 
     private ClientResponse createHeartbeatResponse(
         final NetworkClientDelegate.UnsentRequest request,
-        final Errors error) {
+        final Errors error
+    ) {
+        return createHeartbeatResponse(request, error, memberId, memberEpoch);
+    }
+
+    private ClientResponse createHeartbeatResponseWithMemberIdNull(
+        final NetworkClientDelegate.UnsentRequest request,
+        final Errors error,
+        final int memberEpoch
+    ) {
+        return createHeartbeatResponse(request, error, null, memberEpoch);
+    }
+
+    private ClientResponse createHeartbeatResponse(
+        final NetworkClientDelegate.UnsentRequest request,
+        final Errors error,
+        final String memberId,
+        final int memberEpoch
+    ) {
         ConsumerGroupHeartbeatResponseData data = new 
ConsumerGroupHeartbeatResponseData()
             .setErrorCode(error.code())
             .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
deleted file mode 100644
index 0670b8bdb7c..00000000000
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandlerTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder;
-import org.apache.kafka.common.KafkaException;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.concurrent.BlockingQueue;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
-public class BackgroundEventHandlerTest {
-
-    private ConsumerTestBuilder testBuilder;
-    private BlockingQueue<BackgroundEvent> backgroundEventQueue;
-    private BackgroundEventHandler backgroundEventHandler;
-    private BackgroundEventProcessor backgroundEventProcessor;
-
-    @BeforeEach
-    public void setup() {
-        testBuilder = new ConsumerTestBuilder();
-        backgroundEventQueue = testBuilder.backgroundEventQueue;
-        backgroundEventHandler = testBuilder.backgroundEventHandler;
-        backgroundEventProcessor = testBuilder.backgroundEventProcessor;
-    }
-
-    @AfterEach
-    public void tearDown() {
-        if (testBuilder != null)
-            testBuilder.close();
-    }
-
-    @Test
-    public void testNoEvents() {
-        assertTrue(backgroundEventQueue.isEmpty());
-        backgroundEventProcessor.process((event, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testSingleEvent() {
-        BackgroundEvent event = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-        backgroundEventQueue.add(event);
-        assertPeeked(event);
-        backgroundEventProcessor.process((e, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testSingleErrorEvent() {
-        KafkaException error = new KafkaException("error");
-        BackgroundEvent event = new ErrorBackgroundEvent(error);
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error));
-        assertPeeked(event);
-        assertProcessThrows(error);
-    }
-
-    @Test
-    public void testMultipleEvents() {
-        BackgroundEvent event1 = new ErrorBackgroundEvent(new 
RuntimeException("A"));
-        backgroundEventQueue.add(event1);
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-
-        assertPeeked(event1);
-        backgroundEventProcessor.process((event, error) -> { });
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-
-    @Test
-    public void testMultipleErrorEvents() {
-        Throwable error1 = new Throwable("error1");
-        KafkaException error2 = new KafkaException("error2");
-        KafkaException error3 = new KafkaException("error3");
-
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-
-        assertProcessThrows(new KafkaException(error1));
-    }
-
-    @Test
-    public void testMixedEventsWithErrorEvents() {
-        Throwable error1 = new Throwable("error1");
-        KafkaException error2 = new KafkaException("error2");
-        KafkaException error3 = new KafkaException("error3");
-
-        RuntimeException errorToCheck = new RuntimeException("A");
-        backgroundEventQueue.add(new ErrorBackgroundEvent(errorToCheck));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error1));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("B")));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error2));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("C")));
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error3));
-        backgroundEventQueue.add(new ErrorBackgroundEvent(new 
RuntimeException("D")));
-
-        assertProcessThrows(new KafkaException(errorToCheck));
-    }
-
-    private void assertPeeked(BackgroundEvent event) {
-        BackgroundEvent peekEvent = backgroundEventQueue.peek();
-        assertNotNull(peekEvent);
-        assertEquals(event, peekEvent);
-    }
-
-    private void assertProcessThrows(Throwable error) {
-        assertFalse(backgroundEventQueue.isEmpty());
-
-        try {
-            backgroundEventProcessor.process();
-            fail("Should have thrown error: " + error);
-        } catch (Throwable t) {
-            assertEquals(error.getClass(), t.getClass());
-            assertEquals(error.getMessage(), t.getMessage());
-        }
-
-        assertTrue(backgroundEventQueue.isEmpty());
-    }
-}


Reply via email to