junaiddshaukat opened a new pull request, #37705:
URL: https://github.com/apache/beam/pull/37705
Fixes #37704
### What happened?
This PR optimizes the Kafka IO readers (`ReadFromKafkaDoFn` and
`KafkaUnboundedReader`) by removing the Guava `Stopwatch` and the custom RPC
latency metric tracking from the hot `consumer.poll()` loop.
**The Problem:**
Guava's `Stopwatch` relies on `System.nanoTime()`, which is a relatively
expensive system call. In the Kafka IO module, this timer was being started and
stopped repeatedly inside the `consumer.poll()` loop. When Kafka prefetches
records, this loop executes extremely rapidly, causing the `Stopwatch` to
introduce significant and unnecessary CPU overhead.
**The Solution:**
1. **Removed `Stopwatch`:** Replaced the `Stopwatch` timeout tracking with a
lightweight `System.currentTimeMillis()` calculation to track the
`remainingTimeout`.
2. **Removed Custom RPC Metrics:** Removed the `updateSuccessfulRpcMetrics`
calls and associated histograms (`KafkaMetrics`, `KafkaSinkMetrics`).
3. **Why this is safe:** The custom Beam RPC latency metric is redundant.
Kafka already natively tracks and exposes this data via JMX metrics
(specifically `fetch-latency-avg`), so we can safely drop the custom
implementation to regain performance.
### Changes made:
* Updated `ReadFromKafkaDoFn.java` and `KafkaUnboundedReader.java` to use
`System.currentTimeMillis()` for timeout tracking instead of `Stopwatch`.
* Cleaned up dead code in `KafkaMetrics.java` and `KafkaSinkMetrics.java` by
removing `RPC_LATENCY` histograms and `updateSuccessfulRpcMetrics`.
* Updated `KafkaMetricsTest.java` and `KafkaSinkMetricsTest.java` to reflect
the removed metrics.
### Testing Done
* [x] Compiled successfully.
* [x] Ran and passed the full Kafka IO test suite (`./gradlew
:sdks:java:io:kafka:test`).
* [x] Verified that timeout logic in the poll loop still functions correctly
using `java.time.Duration` and `System.currentTimeMillis()`.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [X] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]