Eliaaazzz opened a new issue, #37531: URL: https://github.com/apache/beam/issues/37531
### What would you like to happen? ### Issue Title `[Feature]: Implement Smart Bucketing Strategies for RunInference` ### Motivation Currently, `RunInference` creates batches based solely on element count or arrival time. For generic models (e.g., BERT, ResNet, standard Hugging Face pipelines) that rely on static padding or dynamic padding (via DataCollator), this random batching leads to **significant padding overhead** when sequence lengths vary widely. This results in **wasted GPU memory and compute resources**, as short sequences are forced to carry heavy padding to match the longest sequence in the batch. ### Proposal I propose implementing **"Smart Bucketing"** strategies upstream of the model handler to group similar-sized elements together. Based on community feedback, I will implement two distinct strategies to cater to different workload constraints: **1. Stateless Strategy (Latency-Sensitive)** Sorts and batches elements within a single bundle using `StartBundle`/`FinishBundle`. * **Pros:** **Zero shuffle cost**, minimal latency overhead. * **Use Case:** High throughput pipelines with sufficient bundle density (enough data per bundle to sort effectively). **2. Stateful Strategy (Cost-Sensitive)** Pre-keys elements by length to leverage the existing `BatchElements` state machinery. * **Pros:** **Global optimization** (minimum padding across all workers), prevention of OOM on expensive models. * **Use Case:** Large/Expensive models where **GPU efficiency > network shuffle cost**. > **Note:** This feature targets generic upstream batching for standard models. It is *not* intended to replace native continuous batching solutions like vLLM for supported models. ### Implementation Plan I will deliver this work via **3 atomic PRs** to ensure manageable review scope: * [ ] **PR 1: Stateless Core.** Implement bundle-level sorting logic (`SortAndBatchElements`) with benchmarks demonstrating padding reduction and throughput. * [ ] **PR 2: Stateful Core.** Implement length-aware keying to reuse `BatchElements`, ensuring correctness across distributed workers. * [ ] **PR 3: Integration.** Expose these strategies in the `RunInference` API with end-to-end success tests. ### Success Metrics * **Padding Efficiency:** Significant reduction in padding tokens per batch (verified by benchmarks using Pareto/Log-normal distributions). * **Throughput:** Comparison (Elements/sec) vs. standard `BatchElements` to ensure sorting overhead is negligible. * **Latency:** Improvement in P95 latency by reducing the "straggler effect" in batches. ### Issue Priority Priority: 2 (default / most feature requests should be filed as P2) ### Issue Components - [x] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
