sjvanrossum commented on PR #32986: URL: https://github.com/apache/beam/pull/32986#issuecomment-2541270102
Feel free to take a peek @scwhittle! 😃 There's also a separate branch with these changes applied on top of 2.61.0 at https://github.com/sjvanrossum/beam/tree/kafkaio-decoupled-consumer-sdf-2.61.0 if you want to build a pipeline. I had some inspiration on Sunday and revised the approach to thread-safe sharing of a Kafka consumer after hitting that 1 GiB/s bottleneck. The new `ConcurrentConsumer` wrapper uses a [`Phaser`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Phaser.html) to synchronize access to the `Consumer`. Registration at the `Phaser` happens when `processElement` is about to enter its poll loop, arrival at the phaser happens at every poll and deregistration happens at every return point or thrown exception. Advancement of the `Phaser` issues a poll on the consumer executed by the thread that last arrived or deregistered. Kafka's info logs are verbose enough to make out that Kafka consumers on various workers are frequently assigned more than one partition. The image below is from a throughput test using 256-512 cores (n2d-standard-2) to read from 500 partitions that are filled by a producer at ~10 GiB/s and the time mark highlights the moment at which the pipeline had scaled up from 256 to 512 cores. I seem to be hitting a cap at 2.5 GiB/s at the moment when I run this test with a shuffle step as a simple IO sink. This same workload using the unmodified source implementation on Dataflow's Runner V1 reports 2.5-3 GiB/s after scaling up. Without an IO sink both of them are able to process 10 GiB/s with comparable behavior throughout the pipeline's lifetime.  The `Phaser` does become a bottleneck, as would be expected, when this same workload is packed onto larger machines and that stacks on top of reduced consumer throughput. I've got a few more ideas to reduce the number of forced sync points and to dynamically size the number of consumers per worker, but I'm also open for suggestions if you have any. 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
