Apologies regarding the formatting in the previous message. My email client stripped the structure. Please find the readable version below:
Hi Beam community, Following the recent merge of my PR on dynamic batching parameters (https://github.com/apache/beam/pull/37428). I’ve been profiling Beam’s RunInference on LLM-style workloads to identify opportunities where we can adapt efficiency patterns from dedicated serving engines. Beam’s unified processing model is a big advantage, but for production GenAI pipelines there might be two bottlenecks that show up quickly in cost and user-perceived latency: 1. Padding waste (Throughput / GPU cost): Current RunInference batching relies on arrival order. In mixed-workload scenarios, a single long prompt forces the entire batch to be padded to its length, wasting significant compute. "Smart Bucketing" is needed to group similar-length inputs and minimize this waste. 2. No incremental output (TTFT / Latency): The current one-shot generation pattern blocks until completion, which makes interactive GenAI flows feel unresponsive compared to streaming gRPC/HTTP endpoints. I’d like to propose a phased roadmap to make production-ready GenAI workloads first-class on Beam ML. Phase 1 — Efficiency Core: Smart Bucketing Goal: Reduce padding overhead significantly (and improve throughput) for variable-length inputs without changing existing behavior. Approach (building on the element_size_fn infrastructure): 1. Use the element_size_fn mechanism to bucket elements by length/size before they reach the batching transform. 2. Implement a stateful bucketing transform using BagState + Timers: maintain per-bucket buffers (e.g., token-length ranges), emit when a specific bucket reaches its target batch size, and use timer-based flushing to bound waiting time. Engineering Principles: 1. Opt-in transform / feature flag so existing pipelines remain unchanged. 2. Clear semantics around flushing (latency bounds, watermark considerations). 3. Benchmarks + tests as part of the initial PR series (padding ratio, throughput, p95 latency). Phase 2 — Latency Breakthrough: Iterative / Streaming Inference Goal: Improve Time-To-First-Token and enable real-time GenAI patterns downstream. Approach: 1. Extend ModelHandler/RunInference with an experimental iterative output mode (token- or chunk-level), where the DoFn can emit partial results as they’re produced. 2. Start behind a feature flag with a conservative compatibility story. (Note: This isn’t trying to turn Beam into a serving engine; it’s about making Beam viable for interactive GenAI pipelines where incremental outputs are part of the dataflow.) Next Steps & Process I’m currently drafting a design doc for Phase 1 (focusing on the state machine, timer policies, and portability notes). If the direction resonates, I’d love to share it early for feedback and iterate before coding. A note on process: I may package this roadmap as a GSoC proposal, but I’m happy to drive it as incremental upstream improvements either way — Phase 1 in particular should land as a sequence of reviewable and measurable PRs. Feedback I’d value: 1. Does “bucketing first, streaming next” match Beam ML’s direction for GenAI workloads? 2. Any runner/portability constraints you’d want explicitly addressed in the bucketing design (state/timers semantics, watermark behavior, etc.)? 3. Would you prefer Phase 2 to target chunk-level streaming first (lower risk) before token-level? Best regards, Elia https://github.com/Eliaaazzz On 2026/02/01 08:07:37 Elia LIU wrote: > Hi Beam community (and Danny), > > Following the recent merge of my PR on dynamic batching parameters ( > https://github.com/apache/beam/pull/37428), I’ve been profiling Beam’s > RunInference on LLM-style workloads to identify opportunities where we can > adapt efficiency patterns from dedicated serving engines. > > Beam’s unified processing model is a big advantage, but for production > GenAI pipelines there are two bottlenecks that show up quickly in cost and > user-perceived latency: > > - > > *Padding waste (Throughput / GPU cost):* Current RunInference batching > relies on arrival order. In mixed-workload scenarios, a single long prompt > forces the entire batch to be padded to its length, wasting significant > compute. While my previous PR optimized batch timing, it implies nothing > about batch composition. "Smart Bucketing" is needed to group > similar-length inputs and minimize this waste. > - > > *No incremental output (TTFT / Latency):* The current one-shot > generation pattern blocks until completion, which makes interactive GenAI > flows feel unresponsive compared to streaming gRPC/HTTP endpoints. > > I’d like to propose a phased roadmap to make production-ready GenAI > workloads first-class on Beam ML. > ------------------------------ > > *Phase 1 — Efficiency Core: Smart Bucketing* *Goal: Reduce padding overhead > significantly (and improve throughput) for variable-length inputs without > changing existing behavior.* > > *Approach* (building on the element_size_fn infrastructure): > > - > > Use the element_size_fn mechanism to bucket elements by length/size > before they reach the batching transform. > - > > Implement a stateful bucketing transform using BagState + Timers: > - > > Maintain per-bucket buffers (e.g., token-length ranges / configurable > boundaries). > - > > Emit when a specific bucket reaches its target batch size. > - > > Use timer-based flushing to bound waiting time and avoid starving > smaller buckets (handling the "straggler" problem). > > *Engineering Principles:* > > - > > Opt-in transform / feature flag so existing pipelines remain unchanged. > - > > Clear semantics around flushing (latency bounds, watermark > considerations). > - > > Benchmarks + tests as part of the initial PR series (padding ratio, > throughput, p95 latency). > > ------------------------------ > > *Phase 2 — Latency Breakthrough: Iterative / Streaming Inference* *Goal: > Improve Time-To-First-Token and enable real-time GenAI patterns downstream.* > > *Approach:* > > - > > Extend ModelHandler/RunInference with an experimental iterative output > mode (token- or chunk-level), where the DoFn can emit partial results as > they’re produced. > - > > Start behind a feature flag with a conservative compatibility story. > > (Note: This isn’t trying to turn Beam into a serving engine; it’s about > making Beam viable for interactive GenAI pipelines where incremental > outputs are part of the dataflow.) > ------------------------------ > > *Next Steps & Process* > > I’m currently drafting a design doc for Phase 1 (focusing on the state > machine, timer policies, and portability notes). If the direction > resonates, I’d love to share it early for feedback and iterate before > coding. > > A note on process: I may package this roadmap as a GSoC proposal, but I’m > happy to drive it as incremental upstream improvements either way — Phase 1 > in particular should land as a sequence of reviewable, measurable PRs. > > *Feedback I’d value:* > > 1. > > Does “bucketing first, streaming next” match Beam ML’s direction for > GenAI workloads? > 2. > > Any runner/portability constraints you’d want explicitly addressed in > the bucketing design (state/timers semantics, watermark behavior, etc.)? > 3. > > Would you prefer Phase 2 to target chunk-level streaming first (lower > risk) before token-level? > > Best regards, > > *Elia* https://github.com/Eliaaazzz >
