sjvanrossum commented on PR #32986:
URL: https://github.com/apache/beam/pull/32986#issuecomment-2541270102

   Feel free to take a peek @scwhittle! 😃 
   There's also a separate branch with these changes applied on top of 2.61.0 
at 
https://github.com/sjvanrossum/beam/tree/kafkaio-decoupled-consumer-sdf-2.61.0 
if you want to build a pipeline.
   
   I had some inspiration on Sunday and revised the approach to thread-safe 
sharing of a Kafka consumer after hitting that 1 GiB/s bottleneck. The new 
`ConcurrentConsumer` wrapper uses a 
[`Phaser`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Phaser.html)
 to synchronize access to the `Consumer`. Registration at the `Phaser` happens 
when `processElement` is about to enter its poll loop, arrival at the phaser 
happens at every poll and deregistration happens at every return point or 
thrown exception. Advancement of the `Phaser` issues a poll on the consumer 
executed by the thread that last arrived or deregistered.
   
   Kafka's info logs are verbose enough to make out that Kafka consumers on 
various workers are frequently assigned more than one partition. The image 
below is from a throughput test using 256-512 cores (n2d-standard-2) to read 
from 500 partitions that are filled by a producer at ~10 GiB/s and the time 
mark highlights the moment at which the pipeline had scaled up from 256 to 512 
cores. I seem to be hitting a cap at 2.5 GiB/s at the moment when I run this 
test with a shuffle step as a simple IO sink. This same workload using the 
unmodified source implementation on Dataflow's Runner V1 reports 2.5-3 GiB/s 
after scaling up. Without an IO sink both of them are able to process 10 GiB/s 
with comparable behavior throughout the pipeline's lifetime.
   
   
![image](https://github.com/user-attachments/assets/8bca8ad3-4e5c-4e8b-a1be-a0337deb6ac1)
   
   The `Phaser` does become a bottleneck, as would be expected, when this same 
workload is packed onto larger machines and that stacks on top of reduced 
consumer throughput. I've got a few more ideas to reduce the number of forced 
sync points and to dynamically size the number of consumers per worker, but I'm 
also open for suggestions if you have any. 👍 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to