XComp commented on a change in pull request #410:
URL: https://github.com/apache/flink-web/pull/410#discussion_r555008132



##########
File path: _posts/2021-01-11-batch-fine-grained-fault-tolerance.md
##########
@@ -0,0 +1,117 @@
+---
+layout: post
+title:  "Exploring fine-grained recovery of bounded data sets on Flink"
+date: 2021-01-11T00:00:00.000Z
+categories: news
+authors:
+- rmetzger:
+  name: "Robert Metzger"
+  twitter: "rmetzger_"
+---
+
+{% toc %}
+
+Apache Flink is a very versatile tool for all kinds of data processing 
workloads. It can process incoming data within a few milliseconds or crunch 
through petabytes of bounded datasets (also known as batch processing).
+
+Processing efficiency is not the only parameter users of data processing 
systems care about. In the real world, system outages due to hardware or 
software failure are expected to happen all the time. For unbounded (or 
streaming) workloads, Flink is using periodic checkpoints to allow for reliable 
and correct recovery. In case of bounded data sets, having a reliable recovery 
mechanism is mission critical — as users do not want to potentially lose many 
hours of intermediate processing results.
+
+Apache Flink 1.9 introduced [fine-grained 
recovery](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures)
 into it's internal workload scheduler. The Flink APIs that are made for 
bounded workloads benefit from this change by individually recovering failed 
operators, re-using results from the previous processing step.
+
+In this blog post, we are going to give an overview over these changes, and we 
will experimentally validate their effectiveness.
+
+
+## **How does fine-grained recovery work?** 
{#how-does-fine-grained-recovery-work}
+
+For streaming jobs (and in [pipelined 
mode](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionMode.html)
 for batch jobs), Flink is using a coarse-grained restart-strategy: upon 
failure, the entire job is restarted (but streaming jobs have an entirely 
different fault-tolerance model, using 
[checkpointing](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#checkpointing))
+
+For batch jobs, we can use a more sophisticated recovery strategy, by using 
cached intermediate results, thus only restarting parts of the pipeline. 
+
+Let’s look at the topology below: Some connections are pipelined (between A1 
and B1, as well as A2 and B2) -- data is immediately streamed from operator A1 
to B1. 
+
+However the output of B1 and B2 is cached on disk (indicated by the grey box). 
We call such connections blocking. If there’s a failure in the steps succeeding 
B1 and B2 and the results of B1 and B2 have already been produced, we don’t 
need to reprocess this part of the pipeline -- we can reuse the cached result.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/batch-fine-grained-fault-tolerance/example.png" width="320px"/>
+</div>
+
+Looking at the case of a failure (here of D2), we see that we do not need to 
restart the entire job. Restarting C2 and all dependent tasks is sufficient. 
This is possible because we can read the cached results of B1 and B2. We call 
this recovery mechanism “fine-grained”, as we only restart parts of the 
topology to recover from a failure -- reducing the recovery time, resource 
consumption and overall job runtime.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/batch-fine-grained-fault-tolerance/recov.png" width="640px"/>
+</div>
+
+
+## **Experimenting with fine-grained recovery** 
{#experimenting-with-fine-grained-recovery}
+
+To validate the implementation, we’ve conducted a small experiment. The 
following sections will describe the setup, the experiment and the results.
+
+
+### **Setup** {#setup}
+
+**Hardware**: The experiment was performed on an idle MacBook Pro 2016 (16 GB 
of memory, SSD storage).
+
+**Test Job**: We used a [modified 
version](https://github.com/rmetzger/flip1-bench/blob/master/flip1-bench-jobs/src/main/java/com/ververica/TPCHQuery3.java)
 (for instrumentation only) of the [TPC-H Query 
3](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java)
 example that is part of the Flink batch (DataSet API) examples, running on 
Flink 1.12
+
+This is the topology of the query:
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/batch-fine-grained-fault-tolerance/job.png" width="640px"/>
+</div>
+
+It has many blocking data exchanges where we cache intermediate results, if 
executed in batch mode.
+
+**Test Data**: We generated a [TPC-H dataset](http://www.tpc.org/tpch/) of 150 
GB as the input.
+
+**Cluster**: We were running 4 TaskManagers with 2 slots each and 1 JobManager 
in standalone mode.
+
+Running this test job takes roughly 15 minutes with the given hardware and 
data.
+
+For **inducing failures** into the job, we decided to randomly throw 
exceptions in the operators. This has a number of benefits compared to randomly 
killing entire TaskManagers:
+
+*   Killing a TaskManager would require starting and registering a new 
TaskManager — which introduces an uncontrollable factor into our benchmark: We 
don't want to test how quickly Flink is reconciling a cluster.
+*   Killing an entire TaskManager would bring down on average 1/4th of all 
running operators. In larger production setups, a failure usually affects only 
a smaller fraction of all running operators. The differences between the 
execution modes would be less obvious if we killed entire TaskManagers.
+*   Keeping TaskManagers across failures helps to better simulate using an 
external shuffle service, as intermediate results are preserved despite a 
failure.
+
+The failures are controlled by a central "[failure 
coordinator](https://github.com/rmetzger/flip1-bench/blob/master/flip1-bench-jobs/src/main/java/com/ververica/utilz/KillerServer.java)"
 which decides when to kill which operator.
+
+Failures are artificially triggered based on a configured mean failure 
frequency. The failures follow an [exponential 
distribution](https://en.wikipedia.org/wiki/Exponential_distribution), which is 
suitable for simulating continuous and independent failures at a configured 
average rate.
+
+
+### **The Experiment** {#the-experiment}
+
+We were running the job with two parameters which we varied in the benchmark:
+
+  * [Execution 
Mode](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html):
 [BATCH or 
PIPELINED](https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/ExecutionMode.html).
+
+    In PIPELINED mode, except for data exchanges susceptible for deadlocks all 
exchanges are pipelined (e.g. upstream operators are streaming their result 
downstream). A failure means that we have to restart the entire job, and start 
the processing from scratch.
+    
+    In BATCH mode, all shuffles and broadcasts are persisted before downstream 
consumption. You can imagine the execution to happen in steps. Since we are 
persisting intermediate results in BATCH mode, we do not have to reprocess all 
upstream operators in case of an (induced) failure. We just have to restart the 
step that was in progress during the failure.
+  * Mean Failure Frequency: This parameter controls the frequency of failures 
induced into the running job. If the parameter is set to 5 minutes, on average, 
a failure occurs every 5 minutes. The failures are following an exponential 
distribution. We’ve chosen values between 15 minutes and 20 seconds.
+
+Each configuration combination was executed at least 3 times. We report the 
average execution time. This is necessary due to the probabilistic behavior of 
the induced failures.
+
+
+### **The Results** {#the-results}
+
+The chart below shows the execution time in seconds for each batch and 
pipelined execution with different failure frequencies.
+
+<div class="row front-graphic">
+  <img src="{{ site.baseurl 
}}/img/blog/batch-fine-grained-fault-tolerance/result.png" width="640px"/>
+</div>
+
+We will now discuss some findings:
+
+1. **Execution time with rare failures**: Looking at the first few results on 
the left, where we compare the behavior with a mean failure frequency of 15 
(=900s), 10 (=600s), 9 (=540s), 8 (=480s), 7 (=420s) minutes. The execution 
times are mostly the same, around 15 minutes. The batch execution time is 
usually lower, and more predictable. This behavior is easy to explain. If an 
error occurred later in the execution, the pipelined mode needs to start from 
scratch, while the batch mode can re-use previous intermediate results. The 
variances in runtime can be explained by statistical effects: if an error 
happens to be induced close to the end of a pipelined mode run, the entire job 
needs to rerun.
+2. **Execution time with frequent failures**: The results “in the middle”, 
with failure frequencies of 6, 5, 4, 3 and 2 minutes show that the pipelined 
mode execution gets unfeasible at some point: If failures happen on average 
every 3 minutes, the average execution time reaches more than 60 minutes, for 
failures every 2 minutes the time spikes to more than 120 minutes. The 
pipelined job is unable to finish the execution, only if we happen to find a 
window where no failure is induced for 15 minutes, the pipelined job manages to 
produce the final result. For more frequent failures, the pipelined mode did 
not manage to finish at all.
+3. **How many failures can the batch mode sustain?** The last numbers, with 
failure frequencies between 60 and 20 seconds are probably a bit unrealistic 
for real world scenarios. But we wanted to investigate how frequent failures 
can become for the batch mode to become unfeasible. With failures induced every 
30 seconds, the average execution time is 30 minutes. In other words, even if 
you have two failures per minute, your execution time only doubles in this 
case. The batch mode is much more predictable and well behaved when it comes to 
execution times.
+
+
+## Conclusion {#conclusion}
+
+Based on these results, it makes a lot of sense to use the batch execution 
mode for batch workloads, as the resource consumption and overall execution 
times are substantially lower compared to the pipelined execution mode.
+
+In general, we recommend conducting your own performance experiments on your 
own hardware and with your own workloads, as results might vary from what we’ve 
presented here. Despite the findings here, the pipelined mode probably has some 
performance advantages in environments with rare failures and slower I/O (for 
example when using spinning disks, or network attached disks). On the other 
hand, CPU intensive workloads might benefit from the batch mode even in slow 
I/O environments.
+
+We should also note that the caching (and subsequent reprocessing on failure) 
only works if the cached results are still present -- this is currently only 
the case, if the TaskManager survives a failure. However, this is an 
unrealistic assumption as many failures would cause the TaskManager process to 
die. To mitigate this limitation, data processing frameworks employ external 
shuffle services that persist the cached results independent of the data 
processing framework. Since Flink 1.9, there is support for a [pluggable 
shuffle 
service](https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service),
 and there are tickets for adding implementations for 
[YARN](https://issues.apache.org/jira/browse/FLINK-13247) and 
[Kubernetes](https://issues.apache.org/jira/browse/FLINK-13246). Once these 
implementations are added, TaskManagers can recover cached results even if the 
process or machine got killed.

Review comment:
       ```suggestion
   We should also note that the caching (and subsequent reprocessing on 
failure) only works if the cached results are still present -- this is 
currently only the case, if the TaskManager survives a failure. However, this 
is an unrealistic assumption as many failures would cause the TaskManager 
process to die. To mitigate this limitation, data processing frameworks employ 
external shuffle services that persist the cached results independent of the 
data processing framework. Since Flink 1.9, there is support for a [pluggable 
shuffle 
service](https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service),
 and there are tickets for adding implementations for YARN 
([FLINK-13247](https://issues.apache.org/jira/browse/FLINK-13247)) and 
Kubernetes ([FLINK-13246](https://issues.apache.org/jira/browse/FLINK-13246)). 
Once these implementations are added, TaskManagers can recover cached results 
even if the process or machine got killed.
   ```
   Minor thing: I added the FLINK issues in brackets to make it more explicit.

##########
File path: _posts/2021-01-11-batch-fine-grained-fault-tolerance.md
##########
@@ -0,0 +1,117 @@
+---
+layout: post
+title:  "Exploring fine-grained recovery of bounded data sets on Flink"
+date: 2021-01-11T00:00:00.000Z
+categories: news
+authors:
+- rmetzger:
+  name: "Robert Metzger"
+  twitter: "rmetzger_"
+---

Review comment:
       We're missing an `excerpt` here
   ```suggestion
   excerpt: Apache Flink 1.9 introduced fine-grained recovery through FLIP-1. 
The Flink APIs that are made for bounded workloads benefit from this change by 
individually recovering failed operators, re-using results from the previous 
processing step. This blog post gives an overview over these changes and 
evaluates their effectiveness.
   ---
   ```

##########
File path: _posts/2021-01-11-batch-fine-grained-fault-tolerance.md
##########
@@ -0,0 +1,117 @@
+---
+layout: post
+title:  "Exploring fine-grained recovery of bounded data sets on Flink"
+date: 2021-01-11T00:00:00.000Z
+categories: news
+authors:
+- rmetzger:
+  name: "Robert Metzger"
+  twitter: "rmetzger_"
+---
+
+{% toc %}
+
+Apache Flink is a very versatile tool for all kinds of data processing 
workloads. It can process incoming data within a few milliseconds or crunch 
through petabytes of bounded datasets (also known as batch processing).
+
+Processing efficiency is not the only parameter users of data processing 
systems care about. In the real world, system outages due to hardware or 
software failure are expected to happen all the time. For unbounded (or 
streaming) workloads, Flink is using periodic checkpoints to allow for reliable 
and correct recovery. In case of bounded data sets, having a reliable recovery 
mechanism is mission critical — as users do not want to potentially lose many 
hours of intermediate processing results.
+
+Apache Flink 1.9 introduced [fine-grained 
recovery](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures)
 into it's internal workload scheduler. The Flink APIs that are made for 
bounded workloads benefit from this change by individually recovering failed 
operators, re-using results from the previous processing step.

Review comment:
       ```suggestion
   Apache Flink 1.9 introduced [fine-grained 
recovery](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures)
 into its internal workload scheduler. The Flink APIs that are made for bounded 
workloads benefit from this change by individually recovering failed operators, 
re-using results from the previous processing step.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to