sjwiesman commented on a change in pull request #14006:
URL: https://github.com/apache/flink/pull/14006#discussion_r521519370
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the
classpath or enable the corresponding FileSystem plugin for the Flink image.
Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
Review comment:
This shouldn't be Hadoop specific.
```suggestion
<span class="label label-info">Note</span> The filesystem which corresponds
to the scheme of your configured ha storage directory must be available to the
runtime. Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
```
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
Review comment:
I'm a little confused by this sentence, who is "we" referring to? Does
this make sense?
```suggestion
When running Flink as a Kubernetes
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/),
the `replica` count should be configured to 1.
This means that a new JobManager will be launched to take over leadership if
the current one terminates exceptionally.
```
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the
classpath or enable the corresponding FileSystem plugin for the Flink image.
Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: flink-config
+ labels:
+ app: flink
+data:
+ flink-conf.yaml: |+
+ ...
+ kubernetes.cluster-id: <ClusterId>
+ high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: hdfs:///flink/recovery
+ restart-strategy: fixed-delay
+ restart-strategy.fixed-delay.attempts: 10
+ ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+ -Dkubernetes.cluster-id=<ClusterId> \
+ -Dtaskmanager.memory.process.size=4096m \
+ -Dkubernetes.taskmanager.cpu=2 \
+ -Dtaskmanager.numberOfTaskSlots=4 \
+ -Dkubernetes.container.image=<CustomImageName> \
+
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
+ -Dhigh-availability.storageDir=s3://flink/flink-ha \
+ -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+ local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High Availability Data Clean Up
+Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+
+So the following command will only shut down the Flink session cluster and
leave all the HA related ConfigMaps, state untouched.
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+{% endhighlight %}
+
+The following commands will cancel the job in application or session cluster
and effectively remove all its HA data.
+{% highlight bash %}
+# Cancel a Flink job in the existing session
+$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID>
<JobID>
+# Cancel a Flink application
+$ ./bin/flink cancel -t kubernetes-application
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+{% endhighlight %}
+
+If the user wants to keep the HA data and restart the Flink cluster, he/she
could also simply delete the deploy(via `kubectl delete deploy <ClusterID>`).
All the Flink cluster related resources will be destroyed(e.g. JobManager
Deployment, TaskManager pods, services, Flink conf ConfigMap) so that it will
not occupy the Kubernetes cluster resources. For the HA related ConfigMaps, we
do not set the owner reference and they could be retained. Then he/she could
use `kubernetes-session.sh` or `flink run-application` to start the
session/application again. All the previous suspending running jobs could
recover from the latest checkpoint successfully.
Review comment:
A good rule of thumb, if you ever find yourself writing "he/she" then
just use "they" instead.
Also, the word "could" implies that something only works some of the time
but I don't think that's what you mean to say here.
```suggestion
To keep HA data while restarting the Flink cluster, simply delete the
deploy (via `kubectl delete deploy <ClusterID>`). All the Flink cluster related
resources will be destroyed (e.g. JobManager Deployment, TaskManager pods,
services, Flink conf ConfigMap), as to not occupy the Kubernetes cluster
resources. However, HA related ConfigMaps do not set the owner reference and
they will be retained. When restarting the session / application, use
`kubernetes-session.sh` or `flink run-application`. All the previous suspending
running jobs will recover from the latest checkpoint successfully.
```
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
Review comment:
```suggestion
Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]