This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch kip1071_trunk_rebase_11_25_add_Bruno_PR
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit db0c2da5df7eecc8b6a99a94cb187bf1c370b03b
Author: Bill <[email protected]>
AuthorDate: Wed Nov 27 15:43:48 2024 -0500

    Changes from rebase
---
 .../clients/consumer/internals/AbstractMembershipManager.java     | 2 +-
 .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java      | 8 ++++----
 .../kafka/clients/consumer/internals/MemberStateListener.java     | 2 +-
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c6aa70d805e..4a7ec8ade0e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -650,7 +650,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      * the group, this will be invoked with empty epoch.
      */
     void notifyEpochChange(Optional<Integer> epoch) {
-        stateUpdatesListeners.forEach(stateListener -> 
stateListener.onMemberEpochUpdated(epoch, memberId));
+        stateUpdatesListeners.forEach(stateListener -> 
stateListener.onMemberEpochUpdated(epoch, Optional.of(memberId)));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 4b013b87004..f96a6319d5f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -266,7 +266,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     private final MemberStateListener memberStateListener = new 
MemberStateListener() {
         @Override
-        public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String 
memberId) {
+        public void onMemberEpochUpdated(Optional<Integer> memberEpoch, 
Optional<String> memberId) {
             updateGroupMetadata(memberEpoch, memberId);
         }
 
@@ -390,7 +390,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             streamsAssignmentInterface.ifPresent(
                 sai -> sai.setApplicationEventHandler(applicationEventHandler)
             );
-            ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
+            this.rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
                     logContext,
                     subscriptions,
                     time,
@@ -652,13 +652,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         );
     }
 
-    private void updateGroupMetadata(final Optional<Integer> memberEpoch, 
final String memberId) {
+    private void updateGroupMetadata(final Optional<Integer> memberEpoch, 
final Optional<String> memberId) {
         memberEpoch.ifPresent(epoch -> groupMetadata.updateAndGet(
                 oldGroupMetadataOptional -> oldGroupMetadataOptional.map(
                     oldGroupMetadata -> new ConsumerGroupMetadata(
                         oldGroupMetadata.groupId(),
                         memberEpoch.orElse(oldGroupMetadata.generationId()),
-                        memberId,
+                            memberId.orElse(oldGroupMetadata.memberId()),
                         oldGroupMetadata.groupInstanceId()
                     )
                 )
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
index 98b6271fcc0..62d82dcf937 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
@@ -36,7 +36,7 @@ public interface MemberStateListener {
      *                    not part of the group anymore.
      * @param memberId    Current member ID. It won't change until the process 
is terminated.
      */
-    void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId);
+    void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> 
memberId);
 
     /**
      * This callback is invoked when a group member's assigned set of 
partitions changes. Assignments can change via

Reply via email to