Copilot commented on code in PR #15208:
URL: https://github.com/apache/iceberg/pull/15208#discussion_r2751225167


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java:
##########
@@ -92,16 +93,44 @@ boolean hasLeaderPartition(Collection<TopicPartition> 
currentAssignedPartitions)
   @VisibleForTesting
   boolean containsFirstPartition(
       Collection<MemberDescription> members, Collection<TopicPartition> 
partitions) {
-    // there should only be one task assigned partition 0 of the first topic,
-    // so elect that one the leader
-    TopicPartition firstTopicPartition =
-        members.stream()
-            .flatMap(member -> member.assignment().topicPartitions().stream())
-            .min(new TopicPartitionComparator())
-            .orElseThrow(
-                () -> new ConnectException("No partitions assigned, cannot 
determine leader"));
-
-    return partitions.contains(firstTopicPartition);
+    // Determine the first partition across all members to elect the leader
+    TopicPartition firstTopicPartition = findFirstTopicPartition(members);
+
+    if (firstTopicPartition == null) {
+      LOG.warn(
+          "Committer {} found no partitions assigned across all members, 
cannot determine leader",
+          identifier);
+      return false;
+    }
+
+    boolean containsFirst = partitions.contains(firstTopicPartition);
+    if (containsFirst) {
+      LOG.info(
+          "Committer {} contains the first partition {}, this task is the 
leader",
+          identifier,
+          firstTopicPartition);
+    } else {
+      LOG.debug(
+          "Committer {} does not contain the first partition {}, not the 
leader",
+          identifier,
+          firstTopicPartition);
+    }
+
+    return containsFirst;
+  }
+
+  /**
+   * Finds the first (minimum) topic partition across all consumer group 
members.
+   *
+   * @param members the collection of consumer group members
+   * @return the first topic partition, or null if no partitions are assigned

Review Comment:
   The newly extracted method 'findFirstTopicPartition' lacks a clear 
explanation of the sorting criteria used by TopicPartitionComparator. Consider 
documenting how the 'first' partition is determined (e.g., lexicographically by 
topic name, then by partition number).
   ```suggestion
      * <p>The "first" partition is determined using {@link 
TopicPartitionComparator}, which orders
      * {@link TopicPartition} instances lexicographically by topic name and, 
for equal topics, by
      * ascending partition number.
      *
      * @param members the collection of consumer group members
      * @return the first topic partition according to {@link 
TopicPartitionComparator}, or null if no
      *     partitions are assigned
   ```



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java:
##########
@@ -48,6 +47,7 @@ public class CommitterImpl implements Committer {
   private KafkaClientFactory clientFactory;
   private Collection<MemberDescription> membersWhenWorkerIsCoordinator;
   private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+  private String identifier;

Review Comment:
   The field 'identifier' should be marked as 'final' since it's only assigned 
once during initialization and never modified afterward. This makes the 
immutability explicit and prevents accidental reassignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to