damccorm commented on code in PR #29860: URL: https://github.com/apache/beam/pull/29860#discussion_r1437173522
########## website/www/site/content/en/blog/apache-beam-flink-and-kubernetes-part3.md: ########## @@ -0,0 +1,185 @@ +--- +title: "Behind the Scenes: Crafting an Autoscaler for Apache Beam in a High-Volume Streaming Environment" +date: 2023-12-21 09:00:00 -0400 +categories: + - blog +authors: + - talat +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + + +### Introduction to the Design of Our Autoscaler for Apache Beam Jobs + +Welcome to the third and final part of our blog series on building a scalable, self-managed streaming infrastructure with Beam and Flink. [In our previous post](https://beam.apache.org/blog/apache-beam-flink-and-kubernetes/), we delved into the scale of our streaming platforms, highlighting our capacity to manage over 40,000 streaming jobs and process upwards of 10 million events per second. This impressive scale sets the stage for the challenge we address today: the intricate task of resource allocation in a dynamic streaming environment. + +In this blog post [Talat Uyarer (Architect / Senior Principal Engineer)](https://www.linkedin.com/in/talatuyarer/) and [Rishabh Kedia (Principal Engineer)](https://www.linkedin.com/in/rishabhkedia/) describe more details about our Autoscaler. Imagine a scenario where your streaming system is inundated with fluctuating workloads. Our case presents a unique challenge, as our customers, equipped with firewalls distributed globally, generate logs at various times of the day. This results in workloads that not only vary by time but also escalate over time due to changes in settings or the addition of new cybersecurity solutions from PANW. Furthermore, updates to our codebase necessitate rolling out changes across all streaming jobs, leading to a temporary surge in demand as the system processes unprocessed data. + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes-part3/resource-allocation.png" +alt="Resource Allocation"> + +Traditionally, managing this ebb and flow of demand involves a manual, often inefficient approach. One might over-provision resources to handle peak loads, inevitably leading to resource wastage during off-peak hours. Conversely, a more cost-conscious strategy might involve accepting delays during peak times, with the expectation of catching up later. However, both methods demand constant monitoring and manual adjustment - a far from ideal situation. + +In this modern era, where automated scaling of web front-ends is a given, we aspire to bring the same level of efficiency and automation to streaming infrastructure. Our goal is to develop a system that can dynamically track and adjust to the workload demands of our streaming operations. In this blog post, we will introduce you to our innovative solution - an autoscaler designed specifically for Apache Beam jobs. + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes-part3/auto-tuned-worker.png" +alt="Auto Tuned Resource Allocation"> + +For clarity, when we refer to "resources" in this context, we mean the number of Flink Task Managers, or Kubernetes Pods, that process your streaming pipeline. These Task Managers aren't just about CPU; they also involve RAM, Network, Disk IO, and other computational resources. + +However, our solution is predicated on certain assumptions. Primarily, it's geared towards operations processing substantial data volumes. If your workload only requires a couple of Task Managers, this system might not be the best fit. In Our case we have 10K+ workload and each each of them has different workload. Manual tuning was not an option for us. We also assume that the data is evenly distributed, allowing for increased throughput with the addition of more Task Managers. This assumption is crucial for effective horizontal scaling. While there are real-world complexities that might challenge these assumptions, for the scope of this discussion, we will focus on scenarios where these conditions hold true. + +Join us as we delve into the design and functionality of our autoscaler, a solution tailored to bring efficiency, adaptability, and a touch of intelligence to the world of streaming infrastructure. + + +## Identifying the Right Signals for Autoscaling + +When we're overseeing a system like Apache Beam jobs on Flink, it's crucial to identify key signals that help us understand the relationship between our workload and resources. These signals are our guiding lights, showing us when we're lagging behind or wasting resources. By accurately identifying these signals, we can formulate effective scaling policies and implement changes in real-time. Imagine needing to expand from 100 to 200 TaskManagers — how do we smoothly make that transition? That's where these signals come into play. + +Remember, we're aiming for a universal solution applicable to any workload and pipeline. While specific problems might benefit from unique signals, our focus here is on creating a one-size-fits-all approach. + +In Flink, tasks form the basic execution unit and consist of one or more operators, such as map, filter, or reduce. Flink optimizes performance by chaining these operators into single tasks when possible, minimizing overheads like thread context switching and network I/O. Your pipeline, when optimized, turns into a directed acyclic graph of stages, each processing elements based on your code. Don't confuse stages with physical machines — they're separate concepts. In our job we measure backlog information by using Apache Beam's [`backlog_bytes` and `backlog_elements`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/SourceMetrics.java#L32) metrics. + +<img class="center-block" +src="/images/blog/apache-beam-flink-and-kubernetes-part3/flink-operator-chaining.png" +alt="Apache Beam Pipeline Optimization by Apache Flink"> + +##### **Upscaling Signals** + +##### *Backlog Growth* +Let’s take a practical example. Consider a pipeline reading from Kafka, where different operators handle data parsing, formatting, and accumulation. The key metric here is throughput — how much data each operstor processes over time. But throughput alone isn't enough. We need to examine the queue size or backlog at each operator. A growing backlog indicates we're falling behind. We measure this as backlog growth — the first derivative of backlog size over time, highlighting our processing deficit. + +```math +Backlog Growth = \frac{d(Backlog)}{dt} Review Comment: Seems like this is close to what we want, but it is still not quite right - https://apache-beam-website-pull-requests.storage.googleapis.com/29860/blog/apache-beam-flink-and-kubernetes-part3/index.html ########## website/www/site/content/en/blog/apache-beam-flink-and-kubernetes-part3.md: ########## @@ -121,25 +124,33 @@ In summary, our scaling policy is for scale up, we first ensure that the time to Increasing Backlog aka Backlog Growth > 0 : -$`Worker_require = Worker_current \frac{Backlog Growth + Throughput}{Throughput}`$ +```math +Worker_{require} = Worker_{current} \frac{Backlog Growth + Throughput}{Throughput} +``` Consistent Backlog aka Backlog Growth = 0: -$`Worker_extra = Worker_current \frac{Backlog Time}{Time to Reduce Backlog}`$ +```math +Worker_{extra} = Worker_{current} \frac{Backlog Time}{Time to Reduce Backlog} +``` To Sum up: -$`Worker_scaleup = min(Worker_require + Worker_extra, Worker_max)`$ +```math +Worker_{scaleup} = min(Worker_{require} + Worker_{extra}, Worker_{max}) +``` To scale down, we need to ensure the machine utilization is low (< 70%) and there is no backlog growth and current time to drain backlog is less than the limit (10s) So the only driving factor to calculate the required resources after a scale down is CPU -$`CPURate_desired = \frac{Worker_current}{Worker_new} CpuRate_current`$ +```math +CPURate_{desired} = \frac{Worker_{current}}{Worker_{new}} CpuRate_{current} +``` ## Executing Autoscaling Decision -In Our Setup we use Reactive Mode which uses Adaptive Scheduler and Declarative Resources manager. We wanted to align resources with slots. As Adviced most of Flink documentation We set per vCPU one slot. Most of Our jobs uses 1 vCPU 4GB Memory combination for TaskManager. +In our setup we use Reactive Mode which uses Adaptive Scheduler and Declarative Resources manager. We wanted to align resources with slots. As Advised in most of the Flink documentation we set one per vCPU slot. Most of our jobs use 1 vCPU 4GB Memory combination for TaskManager. Review Comment: ```suggestion In our setup we use Reactive Mode which uses Adaptive Scheduler and Declarative Resources manager. We wanted to align resources with slots. As advised in most of the Flink documentation we set one per vCPU slot. Most of our jobs use 1 vCPU 4GB Memory combination for TaskManager. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
