hudeqi commented on code in PR #14077: URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord<byte[], byte[]> record) { record.timestamp(), headers); } + //visible for testing + void reportReplicationOffsetLag(ConsumerRecords<byte[], byte[]> lastPolledRecords) { + Set<TopicPartition> partitions = lastPolledRecords.partitions(); + partitions.forEach(p -> { + try { + long replicationOffsetLag = getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p)); + if (replicationOffsetLag < 0) { + log.warn("Replication offset lag for partition {} is negative({}) - " + + "skipping metric reporting for this partition.", p, replicationOffsetLag); + return; + } + metrics.replicationOffsetLag(p, replicationOffsetLag + 1); //+1 to account for zero-based offset numbering + } catch (UnsupportedOperationException e) { + log.error("Failed to calculate replication offset lag for partition {}.", p, e); + } + }); + } + + private long getReplicationOffsetLagForPartition(TopicPartition partition, + List<ConsumerRecord<byte[], byte[]>> lastPolledRecordsForPartition) { + ConsumerRecord<byte[], byte[]> lastPolledRecord = + lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1); + if (!lastPolledRecord.topic().equals(partition.topic()) || lastPolledRecord.partition() != partition.partition()) { + String error = String.format( + "Unexpected topic/partition mismatch while calculating replication-offset-lag. Expected: %s, got: %s-%s.", + partition, lastPolledRecord.topic(), lastPolledRecord.partition()); + throw new UnsupportedOperationException(error); + } + long endOffsetForPartition = lastPolledRecord.offset(); Review Comment: My doubts are here: I think the LEO of the partition should be the log end offset of the partition in the source cluster, but the `lastPolledRecord.offset()` here represents only the offset in the source cluster of the last record polled by the task on the partition, that is to say, maybe the log end offset of the source cluster has reached 100, but due to the poor consumer performance of the task, it is actually only polled to the position where the offset is equal to 80, so the lag must be greater than 20 (the reason why it is greater than this is because it has just been polled data, not yet written to the target cluster, I think we agree on this). But if you follow the logic here in the PR, LEO will be 80, but in fact the offset of the source cluster has been written to 100. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org