This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch kip1071_trunk_rebase_11_25_add_Bruno_PR in repository https://gitbox.apache.org/repos/asf/kafka.git
commit db0c2da5df7eecc8b6a99a94cb187bf1c370b03b Author: Bill <[email protected]> AuthorDate: Wed Nov 27 15:43:48 2024 -0500 Changes from rebase --- .../clients/consumer/internals/AbstractMembershipManager.java | 2 +- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 8 ++++---- .../kafka/clients/consumer/internals/MemberStateListener.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index c6aa70d805e..4a7ec8ade0e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -650,7 +650,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl * the group, this will be invoked with empty epoch. */ void notifyEpochChange(Optional<Integer> epoch) { - stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId)); + stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, Optional.of(memberId))); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 4b013b87004..f96a6319d5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -266,7 +266,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { private final MemberStateListener memberStateListener = new MemberStateListener() { @Override - public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) { + public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) { updateGroupMetadata(memberEpoch, memberId); } @@ -390,7 +390,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { streamsAssignmentInterface.ifPresent( sai -> sai.setApplicationEventHandler(applicationEventHandler) ); - ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( + this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( logContext, subscriptions, time, @@ -652,13 +652,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { ); } - private void updateGroupMetadata(final Optional<Integer> memberEpoch, final String memberId) { + private void updateGroupMetadata(final Optional<Integer> memberEpoch, final Optional<String> memberId) { memberEpoch.ifPresent(epoch -> groupMetadata.updateAndGet( oldGroupMetadataOptional -> oldGroupMetadataOptional.map( oldGroupMetadata -> new ConsumerGroupMetadata( oldGroupMetadata.groupId(), memberEpoch.orElse(oldGroupMetadata.generationId()), - memberId, + memberId.orElse(oldGroupMetadata.memberId()), oldGroupMetadata.groupInstanceId() ) ) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java index 98b6271fcc0..62d82dcf937 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java @@ -36,7 +36,7 @@ public interface MemberStateListener { * not part of the group anymore. * @param memberId Current member ID. It won't change until the process is terminated. */ - void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId); + void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId); /** * This callback is invoked when a group member's assigned set of partitions changes. Assignments can change via
