Thanks for the detailed feedback!
Regarding Smart Bucketing:
Your suggestion to look into both the existing stateful BatchElements and 
stateless bundle processing actually inspired me to implement both strategies. 
They cater to two very different user scenarios, and having both makes the 
generic RunInference much more powerful:

  1.
Latency-Sensitive / High Throughput (Stateless)
This involves sorting within a single bundle (using StartBundle/FinishBundle) 
to offer a zero-shuffle overhead solution. It is perfect for users who 
prioritize speed and have enough data volume per bundle to get "good enough" 
batches locally without the cost of serialization or network transfer.
  2.
Cost-Sensitive / Sparse Data (Stateful)
This involves pre-keying elements by length to reuse the existing BatchElements 
machinery. This prioritizes maximum GPU efficiency. By paying the shuffle cost, 
we achieve global optimization: grouping similar-sized elements from all 
workers to minimize padding. This is critical for expensive models where GPU 
memory is the bottleneck.

Plan to land this:
I will open a tracking issue and break this down into 3 atomic PRs, focusing on 
verification:

  *
Stateless: Implement bundle-level sorting, backed by benchmarks specifically 
measuring padding ratio reduction, throughput, and P95 latency.
  *
Stateful: Implement the key-assignment logic to leverage BatchElements, with 
correctness tests to verify accurate grouping behavior.
  *
Integration: Expose these strategies in RunInference.

Regarding vLLM:
Fully agreed. We definitely shouldn't reinvent the wheel given that vLLM 
already has built-in capabilities for streaming.
I'll get the issue opened shortly.
Thanks,
Elia


On 2026/02/02 17:29:16 Danny McCormick via dev wrote:
> Thanks for the writeup! I'm generally +1 to the smart bucketing workstream.
> This should probably leverage the stateful batch elements which already
> exists though (
> https://github.com/apache/beam/blob/112685de58d7b24b70b8d8eb7171f0e28b35a2f1/sdks/python/apache_beam/transforms/util.py#L1305)
> and/or work without state (sorting/batching by size is still doable in the
> context of a single bundle). More design here will help regardless!
>
> The value of the interactive pipeline is a bit less clear to me, and much
> harder to do well; I'd be curious to see more of the mechanics of what
> you're proposing. An example use case would be helpful as well. I will note
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/vllm_inference.py
> which uses vLLM to do some of what you describe.
>
> Thanks,
> Danny
>
> On Sun, Feb 1, 2026 at 4:19 AM Elia LIU <[email protected]> wrote:
>
> > Apologies regarding the formatting in the previous message. My email
> > client stripped the structure. Please find the readable version below:
> >
> > Hi Beam community,
> >
> > Following the recent merge of my PR on dynamic batching parameters (
> > https://github.com/apache/beam/pull/37428). I’ve been profiling Beam’s
> > RunInference on LLM-style workloads to identify opportunities where we can
> > adapt efficiency patterns from dedicated serving engines.
> >
> > Beam’s unified processing model is a big advantage, but for production
> > GenAI pipelines there might be two bottlenecks that show up quickly in cost
> > and user-perceived latency:
> >
> > 1. Padding waste (Throughput / GPU cost): Current RunInference batching
> > relies on arrival order. In mixed-workload scenarios, a single long prompt
> > forces the entire batch to be padded to its length, wasting significant
> > compute. "Smart Bucketing" is needed to group similar-length inputs and
> > minimize this waste.
> >
> > 2. No incremental output (TTFT / Latency): The current one-shot generation
> > pattern blocks until completion, which makes interactive GenAI flows feel
> > unresponsive compared to streaming gRPC/HTTP endpoints.
> >
> > I’d like to propose a phased roadmap to make production-ready GenAI
> > workloads first-class on Beam ML.
> >
> >
> > Phase 1 — Efficiency Core: Smart Bucketing
> >
> > Goal: Reduce padding overhead significantly (and improve throughput) for
> > variable-length inputs without changing existing behavior.
> >
> > Approach (building on the element_size_fn infrastructure):
> >
> > 1. Use the element_size_fn mechanism to bucket elements by length/size
> > before they reach the batching transform.
> >
> > 2. Implement a stateful bucketing transform using BagState + Timers:
> > maintain per-bucket buffers (e.g., token-length ranges), emit when a
> > specific bucket reaches its target batch size, and use timer-based flushing
> > to bound waiting time.
> >
> > Engineering Principles:
> >
> > 1. Opt-in transform / feature flag so existing pipelines remain unchanged.
> >
> > 2. Clear semantics around flushing (latency bounds, watermark
> > considerations).
> >
> > 3. Benchmarks + tests as part of the initial PR series (padding ratio,
> > throughput, p95 latency).
> >
> >
> > Phase 2 — Latency Breakthrough: Iterative / Streaming Inference
> >
> > Goal: Improve Time-To-First-Token and enable real-time GenAI patterns
> > downstream.
> >
> > Approach:
> >
> > 1. Extend ModelHandler/RunInference with an experimental iterative output
> > mode (token- or chunk-level), where the DoFn can emit partial results as
> > they’re produced.
> >
> > 2. Start behind a feature flag with a conservative compatibility story.
> >
> > (Note: This isn’t trying to turn Beam into a serving engine; it’s about
> > making Beam viable for interactive GenAI pipelines where incremental
> > outputs are part of the dataflow.)
> >
> >
> > Next Steps & Process
> >
> > I’m currently drafting a design doc for Phase 1 (focusing on the state
> > machine, timer policies, and portability notes). If the direction
> > resonates, I’d love to share it early for feedback and iterate before
> > coding.
> >
> > A note on process: I may package this roadmap as a GSoC proposal, but I’m
> > happy to drive it as incremental upstream improvements either way — Phase 1
> > in particular should land as a sequence of reviewable and measurable PRs.
> >
> >
> > Feedback I’d value:
> >
> > 1. Does “bucketing first, streaming next” match Beam ML’s direction for
> > GenAI workloads?
> >
> > 2. Any runner/portability constraints you’d want explicitly addressed in
> > the bucketing design (state/timers semantics, watermark behavior, etc.)?
> >
> > 3. Would you prefer Phase 2 to target chunk-level streaming first (lower
> > risk) before token-level?
> >
> >
> > Best regards,
> >
> > Elia
> >
> > https://github.com/Eliaaazzz
> >
> > On 2026/02/01 08:07:37 Elia LIU wrote:
> > > Hi Beam community (and Danny),
> > >
> > > Following the recent merge of my PR on dynamic batching parameters (
> > > https://github.com/apache/beam/pull/37428), I’ve been profiling Beam’s
> > > RunInference on LLM-style workloads to identify opportunities where we
> > can
> > > adapt efficiency patterns from dedicated serving engines.
> > >
> > > Beam’s unified processing model is a big advantage, but for production
> > > GenAI pipelines there are two bottlenecks that show up quickly in cost
> > and
> > > user-perceived latency:
> > >
> > > -
> > >
> > > *Padding waste (Throughput / GPU cost):* Current RunInference
> > batching
> > > relies on arrival order. In mixed-workload scenarios, a single long
> > prompt
> > > forces the entire batch to be padded to its length, wasting
> > significant
> > > compute. While my previous PR optimized batch timing, it implies
> > nothing
> > > about batch composition. "Smart Bucketing" is needed to group
> > > similar-length inputs and minimize this waste.
> > > -
> > >
> > > *No incremental output (TTFT / Latency):* The current one-shot
> > > generation pattern blocks until completion, which makes interactive
> > GenAI
> > > flows feel unresponsive compared to streaming gRPC/HTTP endpoints.
> > >
> > > I’d like to propose a phased roadmap to make production-ready GenAI
> > > workloads first-class on Beam ML.
> > > ------------------------------
> > >
> > > *Phase 1 — Efficiency Core: Smart Bucketing* *Goal: Reduce padding
> > overhead
> > > significantly (and improve throughput) for variable-length inputs without
> > > changing existing behavior.*
> > >
> > > *Approach* (building on the element_size_fn infrastructure):
> > >
> > > -
> > >
> > > Use the element_size_fn mechanism to bucket elements by length/size
> > > before they reach the batching transform.
> > > -
> > >
> > > Implement a stateful bucketing transform using BagState + Timers:
> > > -
> > >
> > > Maintain per-bucket buffers (e.g., token-length ranges /
> > configurable
> > > boundaries).
> > > -
> > >
> > > Emit when a specific bucket reaches its target batch size.
> > > -
> > >
> > > Use timer-based flushing to bound waiting time and avoid starving
> > > smaller buckets (handling the "straggler" problem).
> > >
> > > *Engineering Principles:*
> > >
> > > -
> > >
> > > Opt-in transform / feature flag so existing pipelines remain
> > unchanged.
> > > -
> > >
> > > Clear semantics around flushing (latency bounds, watermark
> > > considerations).
> > > -
> > >
> > > Benchmarks + tests as part of the initial PR series (padding ratio,
> > > throughput, p95 latency).
> > >
> > > ------------------------------
> > >
> > > *Phase 2 — Latency Breakthrough: Iterative / Streaming Inference* *Goal:
> > > Improve Time-To-First-Token and enable real-time GenAI patterns
> > downstream.*
> > >
> > > *Approach:*
> > >
> > > -
> > >
> > > Extend ModelHandler/RunInference with an experimental iterative
> > output
> > > mode (token- or chunk-level), where the DoFn can emit partial
> > results as
> > > they’re produced.
> > > -
> > >
> > > Start behind a feature flag with a conservative compatibility story.
> > >
> > > (Note: This isn’t trying to turn Beam into a serving engine; it’s about
> > > making Beam viable for interactive GenAI pipelines where incremental
> > > outputs are part of the dataflow.)
> > > ------------------------------
> > >
> > > *Next Steps & Process*
> > >
> > > I’m currently drafting a design doc for Phase 1 (focusing on the state
> > > machine, timer policies, and portability notes). If the direction
> > > resonates, I’d love to share it early for feedback and iterate before
> > > coding.
> > >
> > > A note on process: I may package this roadmap as a GSoC proposal, but I’m
> > > happy to drive it as incremental upstream improvements either way —
> > Phase 1
> > > in particular should land as a sequence of reviewable, measurable PRs.
> > >
> > > *Feedback I’d value:*
> > >
> > > 1.
> > >
> > > Does “bucketing first, streaming next” match Beam ML’s direction for
> > > GenAI workloads?
> > > 2.
> > >
> > > Any runner/portability constraints you’d want explicitly addressed in
> > > the bucketing design (state/timers semantics, watermark behavior,
> > etc.)?
> > > 3.
> > >
> > > Would you prefer Phase 2 to target chunk-level streaming first (lower
> > > risk) before token-level?
> > >
> > > Best regards,
> > >
> > > *Elia* https://github.com/Eliaaazzz
> > >
> >
>

Reply via email to