junaiddshaukat commented on code in PR #37705:
URL: https://github.com/apache/beam/pull/37705#discussion_r2853776459
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -567,19 +566,14 @@ public ProcessContinuation processElement(
long expectedOffset = tracker.currentRestriction().getFrom();
consumer.resume(Collections.singleton(topicPartition));
consumer.seek(topicPartition, expectedOffset);
- final Stopwatch pollTimer = Stopwatch.createUnstarted();
final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
while (Duration.ZERO.compareTo(remainingTimeout) < 0) {
- // TODO: Remove this timer and use the existing fetch-latency-avg
metric.
Review Comment:
Thanks for the clarification, and I think there was some misunderstanding on
my side about the intent of the TODO.
I originally read
> “Remove this timer and use the existing fetch‑latency‑avg metric.”
as “the custom Stopwatch‑based timer in the hot consumer.poll() loop is too
expensive and should be removed / replaced,” so I focused this PR on changing
how we measure the latency (from Stopwatch to System.currentTimeMillis) while
keeping the existing Beam metric behavior.
From your comments and the link to the Kafka consumer metrics docs, I now
understand that your intent is more about relying on Kafka’s own
fetch-latency-avg JMX metric instead of having a separate Beam RpcLatency
metric here, rather than just which Java clock is used. In that sense, you’re
right: this PR doesn’t actually implement the TODO, it just refactors the timer
and doesn’t add much value.
Given that, I’m happy to either:
- Close this PR as a misaligned optimization attempt, or
- If you think it’s worthwhile, follow up with a separate change that truly
removes the custom latency timer / RpcLatency metric in favor of Kafka’s
metrics (with tests and a clearer design discussion up front).
Please let me know which direction you’d prefer; if you’d rather keep the
current behavior and leave the TODO for later, I’ll go ahead and close this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]