rmetzger commented on a change in pull request #427:
URL: https://github.com/apache/flink-web/pull/427#discussion_r627170837



##########
File path: _posts/2021-04-05-reactive-mode.md
##########
@@ -0,0 +1,150 @@
+---
+layout: post
+title:  "Scaling Flink automatically with Reactive Mode"
+date: 2021-04-5T00:00:00.000Z
+authors:
+- rob:
+  name: "Robert Metzger"
+  twitter: "rmetzger_"
+excerpt: Apache Flink 1.13 introduced Reactive Mode, a big step forward in 
Flink's ability to dynamically adjust to changing workloads, reducing resource 
utilization and overall costs. The blog post is demonstrating Reactive Mode on 
Kubernetes, including some lessons learned.
+
+---
+
+{% toc %}
+
+## Introduction
+
+Streaming jobs which run for several days or longer usually experience changes 
in their workload during their lifetime. These changes can originate from 
seasonal spikes, such as day vs. night, weekdays vs. weekend or holidays vs. 
non-holidays, sudden events or simply growing popularity of your product. Some 
of these changes are more predictable than others but what all have in common 
is that they change the resource demand of your job if you want to keep the 
same service quality for your customers.
+
+A simple way of quantifying the mismatch between the required resources and 
the available resources is by measuring the space between the actual load and 
the number of available workers. In the figure below, in case of static 
resource allocation (on the left), we see that there's a lot of space between 
the actual load and the available workers, hence we are wasting resources. For 
elastic resource allocation, there is only little space between the red and 
black line.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl }}/img/blog/2021-04-reactive-mode/intro.svg" 
width="640px" alt="Reactive Mode Intro"/>
+</div>
+
+
+Since Flink 1.2 introduced [rescalable 
state](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html),
 it has been possible to **manually rescale** a Flink job. You can 
stop-and-restore a job with a different parallelism. If your job was running 
with a parallelism of p=100, and your load increased, you can restart it with 
p=200 to cope with the additional data. 
+
+The problem with this approach is that you have to orchestrate a rescale 
operation with custom tools, including error handling etc. by yourself.
+
+[Reactive 
Mode](https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/elastic_scaling/)
 introduces a new option in Flink 1.13: You monitor your Flink cluster and add 
or remove resources depending on some metrics, Flink will do the rest. Reactive 
Mode is a mode where JobManager will try to use all TaskManager resources 
available.
+
+The big benefit of Reactive Mode is that you don't need any specific knowledge 
to scale Flink anymore. Flink basically behaves like a fleet of servers 
(webservers, caches, batch processing) that you can grow and shrink as you 
wish. Since this is such a common pattern, there is a lot of infrastructure 
available for handling this case. All major cloud providers provide utilities 
to monitor a metric and automatically scale a set of machines accordingly. This 
is for example called [Auto Scaling 
group](https://docs.aws.amazon.com/autoscaling/ec2/userguide/AutoScalingGroup.html)
 in AWS, or [Managed Instance 
group](https://cloud.google.com/compute/docs/instance-groups) in Google Cloud.
+Similarly, Kubernetes has [Horizontal Pod 
Autoscalers](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).
+
+What is interesting as a side note is that unlike most autoscalable "fleets of 
servers", Flink is a stateful system, often processing valuable data requiring 
strong correctness guarantees, comparable to a database. But unlike many 
traditional databases, Flink is resilient enough (through checkpointing and 
state backups) to adjust to changing workloads by just adding or removing 
resources, with very little requirements (simple blob store for state backups).
+
+## Getting Started
+
+If you want to try out Reactive Mode yourself locally, just follow these steps 
using a Flink 1.13.0 distribution:
+
+```bash
+# These instructions assume you are in the root directory of a Flink 
distribution.
+# Put Job into lib/ directory
+cp ./examples/streaming/TopSpeedWindowing.jar lib/
+# Submit Job in Reactive Mode
+./bin/standalone-job.sh start -Dscheduler-mode=reactive 
-Dexecution.checkpointing.interval="10s" -j 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
+# Start first TaskManager
+./bin/taskmanager.sh start
+```
+
+You have now started a Flink job in Reactive Mode. The [web 
interface](http://localhost:8081) shows that the job is running on one 
TaskManager. If you want to scale up the job, simply add another TaskManager to 
the cluster:
+
+```bash
+# Start additional TaskManager
+./bin/taskmanager.sh start
+```
+
+To scale down, remove a TaskManager instance.
+
+```bash
+# Remove a TaskManager
+./bin/taskmanager.sh stop
+```
+
+Reactive Mode also works when deploying [Flink on 
Docker](https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/)
 or using the [standalone Kubernetes 
deployment](https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
 (both only as application clusters).
+
+## Demo on Kubernetes
+
+In this section, we want to demonstrate Reactive Mode in a real-world 
scenario. You can use this demonstration as a starting point for your own 
scalable deployment of Flink on Kubernetes, or as a template for building your 
own deployment on another infrastructure.
+
+### The Setup
+
+The central idea of this demo is to use a Kubernetes [Horizontal Pod 
Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/),
 which monitors the CPU load of all TaskManager pods and adjusts their 
replication factor accordingly. On high CPU load, the autoscaler should add 
more TaskManagers, distributing the load across more machines. On low load, it 
should stop TaskManagers to save resources.
+
+The whole setup is presented here:
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl }}/img/blog/2021-04-reactive-mode/arch.png" 
width="640px" alt="Reactive Mode Demo Architecture"/>
+</div>
+
+Let's discuss the components:
+
+**Flink**
+
+- The **JobManager** is deployed as a [Kubernetes 
job](https://kubernetes.io/docs/concepts/workloads/controllers/job/). We are 
submitting a container that is based on the official Flink Docker image, but 
has the jar file of our job added to it. The Flink job simply reads data from a 
Kafka topic and does some expensive math operations per event received. We use 
these math operations to generate high CPU loads, without requiring a large 
Kafka deployment.
+- The **TaskManager(s)** are deployed as a Kubernetes deployment, which is 
scaled through a [Horizontal Pod 
Autoscaler](https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/).
 In this experiment, the autoscaler is monitoring the CPU load of the pods in 
the deployment. The number of pods is adjusted between 1 and 15 pods by the 
autoscaler.
+
+**Additional Components**:
+
+- We have a **Zookeeper** and **Kafka** deployment (each with one pod) to 
provide a Kafka topic that serves as the input for the Flink job.
+- **Data Generator** pod which produces messages to the Kafka topic. The data 
generator is generating simple string messages at a adjustable rate. In this 
experiment, the rate is following a sine wave.
+- For monitoring, we are deploying **Prometheus** and **Grafana**.
+
+The entire setup is [available on 
GitHub](https://github.com/rmetzger/flink-reactive-mode-k8s-demo) if you want 
to try this out yourself.
+
+### Results
+
+We've deployed above components on a hosted Kubernetes cluster, running it for 
several days. The results are best examined based on a simple Grafana dashboard:
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl }}/img/blog/2021-04-reactive-mode/result.png" 
alt="Reactive Mode Demo Result"/>
+  <p class="align-center">Reactive Mode Experiment Results</p>
+</div>
+
+Let's take a look at the dashboard:
+
+- On the top left, we see the **Kafka consumer lag**, reported by Flink's 
Kafka consumer (source). This metric reports basically the queue size of 
unprocessed messages. A high lag means that Flink is not processing messages as 
fast as they are produced: we need to scale up. 
+
+  In the chart you see that the lag is usually following the throughput of 
data coming from Kafka. When the throughput is the highest, the reported lag is 
at \~75k messages. In low throughput times, it is basically at zero.
+
+- On the top right, you see the **throughput**, measured in Records per 
second, as reported by Flink. You see that the throughput is roughly following 
a sine wave, peaking at 6k messages per second, and going down to almost zero.
+
+- The bottom left chart is showing the **CPU load** per TaskManager. We've 
added this metric to the dashboard, since this is the metric used by Kubernetes 
to decide on the replica count of the TaskManager deployment. We can see that 
as soon as a certain CPU load is reached, additional TaskManagers are started.
+
+- The bottom right chart is showing the **TaskManager count** over time. We 
see when throughput (and CPU load) is peaking, that we usually run on 5 
TaskManagers (with some peaks up to even 8). On low throughput, we are running 
the minimal number of just one TaskManager. This chart is showing nicely that 
Reactive Mode is working as expected in this experiment: The number of 
TaskManagers is adjusting to the load on the system.
+
+
+### Lessons Learned

Review comment:
       It is indeed a pretty long section, but I want to explain to users 
want's going on, instead of just telling them that there was some problem. I 
tried to mitigate the problem by making the headline more specific -- this way 
people understand that they can skip if, if it is too specific for them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to