allenma commented on a change in pull request #481: KYLIN-3821 Add consume lag
stats
URL: https://github.com/apache/kylin/pull/481#discussion_r260692914
##########
File path:
stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/consumer/KafkaConnector.java
##########
@@ -112,7 +126,38 @@ private void fillBuffer() {
for (TopicPartition topicPartition : records.partitions()) {
newBuffer.addAll(records.records(topicPartition));
}
+ if (updateLag) {
+ calConsumeLag();
+ updateLag = false;
+ }
this.buffer = newBuffer;
}
+ public void calConsumeLag() {
+ List<TopicPartition> topicPartitions = Lists.newArrayList();
+ for (Partition partition : partitions) {
+ topicPartitions.add(new TopicPartition(topic,
partition.getPartitionId()));
+ long currentOffset = kafkaConsumer.position(new
TopicPartition(topic, partition.getPartitionId()));
+ partitionCurrentOffsets.put(partition.getPartitionId(),
currentOffset);
+ }
+ Map<TopicPartition, Long> lastestEventOffset =
kafkaConsumer.endOffsets(topicPartitions);
+ for (Map.Entry<TopicPartition, Long> entry :
lastestEventOffset.entrySet()) {
+ long diff = entry.getValue()
+ -
partitionCurrentOffsets.getOrDefault(entry.getKey().partition(),
Long.MAX_VALUE);
+ lagStats.put(entry.getKey().partition(), diff);
+ }
+ }
+
+ @Override
+ public Map<Integer, Long> consumeLag() {
Review comment:
It is better to return a latest partition offsets in the Streaming
connector, and calculate the consume lag in the ConsumerChannel class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services