This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c235305c98 Kafka Connect: Don't check that consumer group is stable 
for coordinator leader election (#14395)
c235305c98 is described below

commit c235305c985e2f4427fa735a1a7460252da8005d
Author: Fenil Doshi <[email protected]>
AuthorDate: Tue Nov 4 11:27:52 2025 -0800

    Kafka Connect: Don't check that consumer group is stable for coordinator 
leader election (#14395)
---
 .../iceberg/connect/channel/CommitterImpl.java     | 16 +++---
 .../iceberg/connect/channel/TestCommitterImpl.java | 67 ++++++++++++++++++++--
 2 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
index fcba5fb629..04602a66a5 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java
@@ -29,7 +29,6 @@ import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -74,18 +73,19 @@ public class CommitterImpl implements Committer {
     }
   }
 
-  private boolean hasLeaderPartition(Collection<TopicPartition> 
currentAssignedPartitions) {
+  @VisibleForTesting
+  boolean hasLeaderPartition(Collection<TopicPartition> 
currentAssignedPartitions) {
     ConsumerGroupDescription groupDesc;
     try (Admin admin = clientFactory.createAdmin()) {
       groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), 
admin);
     }
-    if (groupDesc.state() == ConsumerGroupState.STABLE) {
-      Collection<MemberDescription> members = groupDesc.members();
-      if (containsFirstPartition(members, currentAssignedPartitions)) {
-        membersWhenWorkerIsCoordinator = members;
-        return true;
-      }
+
+    Collection<MemberDescription> members = groupDesc.members();
+    if (containsFirstPartition(members, currentAssignedPartitions)) {
+      membersWhenWorkerIsCoordinator = members;
+      return true;
     }
+
     return false;
   }
 
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
index c8fcbab255..c6b7c86e4c 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java
@@ -19,22 +19,55 @@
 package org.apache.iceberg.connect.channel;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
 
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Optional;
+import org.apache.iceberg.connect.IcebergSinkConfig;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.MemberAssignment;
 import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 
 public class TestCommitterImpl {
 
   @Test
   public void testIsLeader() {
+    MemberAssignment assignment1 =
+        new MemberAssignment(
+            ImmutableSet.of(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 1)));
+    MemberDescription member1 =
+        new MemberDescription(null, Optional.empty(), null, null, assignment1);
+
+    MemberAssignment assignment2 =
+        new MemberAssignment(
+            ImmutableSet.of(new TopicPartition("topic2", 0), new 
TopicPartition("topic1", 1)));
+    MemberDescription member2 =
+        new MemberDescription(null, Optional.empty(), null, null, assignment2);
+
+    List<MemberDescription> members = ImmutableList.of(member1, member2);
+
+    List<TopicPartition> leaderAssignments =
+        ImmutableList.of(new TopicPartition("topic2", 1), new 
TopicPartition("topic1", 0));
+    List<TopicPartition> nonLeaderAssignments =
+        ImmutableList.of(new TopicPartition("topic2", 0), new 
TopicPartition("topic1", 1));
+
     CommitterImpl committer = new CommitterImpl();
+    assertThat(committer.containsFirstPartition(members, 
leaderAssignments)).isTrue();
+    assertThat(committer.containsFirstPartition(members, 
nonLeaderAssignments)).isFalse();
+  }
 
+  @Test
+  public void testHasLeaderPartition() throws NoSuchFieldException, 
IllegalAccessException {
     MemberAssignment assignment1 =
         new MemberAssignment(
             ImmutableSet.of(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 1)));
@@ -49,12 +82,36 @@ public class TestCommitterImpl {
 
     List<MemberDescription> members = ImmutableList.of(member1, member2);
 
-    List<TopicPartition> assignments =
+    List<TopicPartition> leaderAssignments =
         ImmutableList.of(new TopicPartition("topic2", 1), new 
TopicPartition("topic1", 0));
-    assertThat(committer.containsFirstPartition(members, 
assignments)).isTrue();
-
-    assignments =
+    List<TopicPartition> nonLeaderAssignments =
         ImmutableList.of(new TopicPartition("topic2", 0), new 
TopicPartition("topic1", 1));
-    assertThat(committer.containsFirstPartition(members, 
assignments)).isFalse();
+
+    CommitterImpl committer = new CommitterImpl();
+    Field configField = CommitterImpl.class.getDeclaredField("config");
+    Field clientFactoryField = 
CommitterImpl.class.getDeclaredField("clientFactory");
+    configField.setAccessible(true);
+    clientFactoryField.setAccessible(true);
+
+    IcebergSinkConfig config = mock(IcebergSinkConfig.class);
+    when(config.connectGroupId()).thenReturn("test-group");
+    configField.set(committer, config);
+
+    KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
+    Admin admin = mock(Admin.class);
+    when(clientFactory.createAdmin()).thenReturn(admin);
+    clientFactoryField.set(committer, clientFactory);
+
+    try (MockedStatic<KafkaUtils> mockKafkaUtils = 
mockStatic(KafkaUtils.class)) {
+      ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+      mockKafkaUtils
+          .when(() -> KafkaUtils.consumerGroupDescription(any(), any()))
+          .thenReturn(consumerGroupDescription);
+
+      when(consumerGroupDescription.members()).thenReturn(members);
+
+      assertThat(committer.hasLeaderPartition(leaderAssignments)).isTrue();
+      assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
+    }
   }
 }

Reply via email to