Thanks for the writeup! I'm generally +1 to the smart bucketing workstream. This should probably leverage the stateful batch elements which already exists though ( https://github.com/apache/beam/blob/112685de58d7b24b70b8d8eb7171f0e28b35a2f1/sdks/python/apache_beam/transforms/util.py#L1305) and/or work without state (sorting/batching by size is still doable in the context of a single bundle). More design here will help regardless!
The value of the interactive pipeline is a bit less clear to me, and much harder to do well; I'd be curious to see more of the mechanics of what you're proposing. An example use case would be helpful as well. I will note https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/vllm_inference.py which uses vLLM to do some of what you describe. Thanks, Danny On Sun, Feb 1, 2026 at 4:19 AM Elia LIU <[email protected]> wrote: > Apologies regarding the formatting in the previous message. My email > client stripped the structure. Please find the readable version below: > > Hi Beam community, > > Following the recent merge of my PR on dynamic batching parameters ( > https://github.com/apache/beam/pull/37428). I’ve been profiling Beam’s > RunInference on LLM-style workloads to identify opportunities where we can > adapt efficiency patterns from dedicated serving engines. > > Beam’s unified processing model is a big advantage, but for production > GenAI pipelines there might be two bottlenecks that show up quickly in cost > and user-perceived latency: > > 1. Padding waste (Throughput / GPU cost): Current RunInference batching > relies on arrival order. In mixed-workload scenarios, a single long prompt > forces the entire batch to be padded to its length, wasting significant > compute. "Smart Bucketing" is needed to group similar-length inputs and > minimize this waste. > > 2. No incremental output (TTFT / Latency): The current one-shot generation > pattern blocks until completion, which makes interactive GenAI flows feel > unresponsive compared to streaming gRPC/HTTP endpoints. > > I’d like to propose a phased roadmap to make production-ready GenAI > workloads first-class on Beam ML. > > > Phase 1 — Efficiency Core: Smart Bucketing > > Goal: Reduce padding overhead significantly (and improve throughput) for > variable-length inputs without changing existing behavior. > > Approach (building on the element_size_fn infrastructure): > > 1. Use the element_size_fn mechanism to bucket elements by length/size > before they reach the batching transform. > > 2. Implement a stateful bucketing transform using BagState + Timers: > maintain per-bucket buffers (e.g., token-length ranges), emit when a > specific bucket reaches its target batch size, and use timer-based flushing > to bound waiting time. > > Engineering Principles: > > 1. Opt-in transform / feature flag so existing pipelines remain unchanged. > > 2. Clear semantics around flushing (latency bounds, watermark > considerations). > > 3. Benchmarks + tests as part of the initial PR series (padding ratio, > throughput, p95 latency). > > > Phase 2 — Latency Breakthrough: Iterative / Streaming Inference > > Goal: Improve Time-To-First-Token and enable real-time GenAI patterns > downstream. > > Approach: > > 1. Extend ModelHandler/RunInference with an experimental iterative output > mode (token- or chunk-level), where the DoFn can emit partial results as > they’re produced. > > 2. Start behind a feature flag with a conservative compatibility story. > > (Note: This isn’t trying to turn Beam into a serving engine; it’s about > making Beam viable for interactive GenAI pipelines where incremental > outputs are part of the dataflow.) > > > Next Steps & Process > > I’m currently drafting a design doc for Phase 1 (focusing on the state > machine, timer policies, and portability notes). If the direction > resonates, I’d love to share it early for feedback and iterate before > coding. > > A note on process: I may package this roadmap as a GSoC proposal, but I’m > happy to drive it as incremental upstream improvements either way — Phase 1 > in particular should land as a sequence of reviewable and measurable PRs. > > > Feedback I’d value: > > 1. Does “bucketing first, streaming next” match Beam ML’s direction for > GenAI workloads? > > 2. Any runner/portability constraints you’d want explicitly addressed in > the bucketing design (state/timers semantics, watermark behavior, etc.)? > > 3. Would you prefer Phase 2 to target chunk-level streaming first (lower > risk) before token-level? > > > Best regards, > > Elia > > https://github.com/Eliaaazzz > > On 2026/02/01 08:07:37 Elia LIU wrote: > > Hi Beam community (and Danny), > > > > Following the recent merge of my PR on dynamic batching parameters ( > > https://github.com/apache/beam/pull/37428), I’ve been profiling Beam’s > > RunInference on LLM-style workloads to identify opportunities where we > can > > adapt efficiency patterns from dedicated serving engines. > > > > Beam’s unified processing model is a big advantage, but for production > > GenAI pipelines there are two bottlenecks that show up quickly in cost > and > > user-perceived latency: > > > > - > > > > *Padding waste (Throughput / GPU cost):* Current RunInference > batching > > relies on arrival order. In mixed-workload scenarios, a single long > prompt > > forces the entire batch to be padded to its length, wasting > significant > > compute. While my previous PR optimized batch timing, it implies > nothing > > about batch composition. "Smart Bucketing" is needed to group > > similar-length inputs and minimize this waste. > > - > > > > *No incremental output (TTFT / Latency):* The current one-shot > > generation pattern blocks until completion, which makes interactive > GenAI > > flows feel unresponsive compared to streaming gRPC/HTTP endpoints. > > > > I’d like to propose a phased roadmap to make production-ready GenAI > > workloads first-class on Beam ML. > > ------------------------------ > > > > *Phase 1 — Efficiency Core: Smart Bucketing* *Goal: Reduce padding > overhead > > significantly (and improve throughput) for variable-length inputs without > > changing existing behavior.* > > > > *Approach* (building on the element_size_fn infrastructure): > > > > - > > > > Use the element_size_fn mechanism to bucket elements by length/size > > before they reach the batching transform. > > - > > > > Implement a stateful bucketing transform using BagState + Timers: > > - > > > > Maintain per-bucket buffers (e.g., token-length ranges / > configurable > > boundaries). > > - > > > > Emit when a specific bucket reaches its target batch size. > > - > > > > Use timer-based flushing to bound waiting time and avoid starving > > smaller buckets (handling the "straggler" problem). > > > > *Engineering Principles:* > > > > - > > > > Opt-in transform / feature flag so existing pipelines remain > unchanged. > > - > > > > Clear semantics around flushing (latency bounds, watermark > > considerations). > > - > > > > Benchmarks + tests as part of the initial PR series (padding ratio, > > throughput, p95 latency). > > > > ------------------------------ > > > > *Phase 2 — Latency Breakthrough: Iterative / Streaming Inference* *Goal: > > Improve Time-To-First-Token and enable real-time GenAI patterns > downstream.* > > > > *Approach:* > > > > - > > > > Extend ModelHandler/RunInference with an experimental iterative > output > > mode (token- or chunk-level), where the DoFn can emit partial > results as > > they’re produced. > > - > > > > Start behind a feature flag with a conservative compatibility story. > > > > (Note: This isn’t trying to turn Beam into a serving engine; it’s about > > making Beam viable for interactive GenAI pipelines where incremental > > outputs are part of the dataflow.) > > ------------------------------ > > > > *Next Steps & Process* > > > > I’m currently drafting a design doc for Phase 1 (focusing on the state > > machine, timer policies, and portability notes). If the direction > > resonates, I’d love to share it early for feedback and iterate before > > coding. > > > > A note on process: I may package this roadmap as a GSoC proposal, but I’m > > happy to drive it as incremental upstream improvements either way — > Phase 1 > > in particular should land as a sequence of reviewable, measurable PRs. > > > > *Feedback I’d value:* > > > > 1. > > > > Does “bucketing first, streaming next” match Beam ML’s direction for > > GenAI workloads? > > 2. > > > > Any runner/portability constraints you’d want explicitly addressed in > > the bucketing design (state/timers semantics, watermark behavior, > etc.)? > > 3. > > > > Would you prefer Phase 2 to target chunk-level streaming first (lower > > risk) before token-level? > > > > Best regards, > > > > *Elia* https://github.com/Eliaaazzz > > >
