Copilot commented on code in PR #15208:
URL: https://github.com/apache/iceberg/pull/15208#discussion_r2751225167
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java:
##########
@@ -92,16 +93,44 @@ boolean hasLeaderPartition(Collection<TopicPartition>
currentAssignedPartitions)
@VisibleForTesting
boolean containsFirstPartition(
Collection<MemberDescription> members, Collection<TopicPartition>
partitions) {
- // there should only be one task assigned partition 0 of the first topic,
- // so elect that one the leader
- TopicPartition firstTopicPartition =
- members.stream()
- .flatMap(member -> member.assignment().topicPartitions().stream())
- .min(new TopicPartitionComparator())
- .orElseThrow(
- () -> new ConnectException("No partitions assigned, cannot
determine leader"));
-
- return partitions.contains(firstTopicPartition);
+ // Determine the first partition across all members to elect the leader
+ TopicPartition firstTopicPartition = findFirstTopicPartition(members);
+
+ if (firstTopicPartition == null) {
+ LOG.warn(
+ "Committer {} found no partitions assigned across all members,
cannot determine leader",
+ identifier);
+ return false;
+ }
+
+ boolean containsFirst = partitions.contains(firstTopicPartition);
+ if (containsFirst) {
+ LOG.info(
+ "Committer {} contains the first partition {}, this task is the
leader",
+ identifier,
+ firstTopicPartition);
+ } else {
+ LOG.debug(
+ "Committer {} does not contain the first partition {}, not the
leader",
+ identifier,
+ firstTopicPartition);
+ }
+
+ return containsFirst;
+ }
+
+ /**
+ * Finds the first (minimum) topic partition across all consumer group
members.
+ *
+ * @param members the collection of consumer group members
+ * @return the first topic partition, or null if no partitions are assigned
Review Comment:
The newly extracted method 'findFirstTopicPartition' lacks a clear
explanation of the sorting criteria used by TopicPartitionComparator. Consider
documenting how the 'first' partition is determined (e.g., lexicographically by
topic name, then by partition number).
```suggestion
* <p>The "first" partition is determined using {@link
TopicPartitionComparator}, which orders
* {@link TopicPartition} instances lexicographically by topic name and,
for equal topics, by
* ascending partition number.
*
* @param members the collection of consumer group members
* @return the first topic partition according to {@link
TopicPartitionComparator}, or null if no
* partitions are assigned
```
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java:
##########
@@ -48,6 +47,7 @@ public class CommitterImpl implements Committer {
private KafkaClientFactory clientFactory;
private Collection<MemberDescription> membersWhenWorkerIsCoordinator;
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private String identifier;
Review Comment:
The field 'identifier' should be marked as 'final' since it's only assigned
once during initialization and never modified afterward. This makes the
immutability explicit and prevents accidental reassignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]