hudeqi commented on code in PR #14077:
URL: https://github.com/apache/kafka/pull/14077#discussion_r1314583243


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -282,6 +285,38 @@ SourceRecord convertRecord(ConsumerRecord<byte[], byte[]> 
record) {
                 record.timestamp(), headers);
     }
 
+    //visible for testing
+    void reportReplicationOffsetLag(ConsumerRecords<byte[], byte[]> 
lastPolledRecords) {
+        Set<TopicPartition> partitions = lastPolledRecords.partitions();
+        partitions.forEach(p -> {
+            try {
+                long replicationOffsetLag = 
getReplicationOffsetLagForPartition(p, lastPolledRecords.records(p));
+                if (replicationOffsetLag < 0) {
+                    log.warn("Replication offset lag for partition {} is 
negative({}) - " +
+                            "skipping metric reporting for this partition.", 
p, replicationOffsetLag);
+                    return;
+                }
+                metrics.replicationOffsetLag(p, replicationOffsetLag + 1); 
//+1 to account for zero-based offset numbering
+            } catch (UnsupportedOperationException e) {
+                log.error("Failed to calculate replication offset lag for 
partition {}.", p, e);
+            }
+        });
+    }
+
+    private long getReplicationOffsetLagForPartition(TopicPartition partition,
+                                                     
List<ConsumerRecord<byte[], byte[]>> lastPolledRecordsForPartition) {
+        ConsumerRecord<byte[], byte[]> lastPolledRecord =
+                
lastPolledRecordsForPartition.get(lastPolledRecordsForPartition.size() - 1);
+        if (!lastPolledRecord.topic().equals(partition.topic()) || 
lastPolledRecord.partition() != partition.partition()) {
+            String error = String.format(
+                    "Unexpected topic/partition mismatch while calculating 
replication-offset-lag. Expected: %s, got: %s-%s.",
+                    partition, lastPolledRecord.topic(), 
lastPolledRecord.partition());
+            throw new UnsupportedOperationException(error);
+        }
+        long endOffsetForPartition = lastPolledRecord.offset();

Review Comment:
   My doubts are here: I think the LEO of the partition should be the log end 
offset of the partition in the source cluster, but the 
`lastPolledRecord.offset()` here represents only the offset in the source 
cluster of the last record polled by the task on the partition, that is to say, 
maybe the log end offset of the source cluster has reached 100, but due to the 
poor consumer performance of the task, it is actually only polled to the 
position where the offset is equal to 80, so the lag must be greater than 20 
(the reason why it is greater than this is because it has just been polled 
data, not yet written to the target cluster, I think we agree on this). But if 
you follow the logic here in the PR, LEO will be 80, but in fact the offset of 
the source cluster has been written to 100.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to