tillrohrmann commented on a change in pull request #14006:
URL: https://github.com/apache/flink/pull/14006#discussion_r520462834
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the
classpath or enable the corresponding FileSystem plugin for the Flink image.
Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: flink-config
+ labels:
+ app: flink
+data:
+ flink-conf.yaml: |+
+ ...
+ kubernetes.cluster-id: <ClusterId>
+ high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: hdfs:///flink/recovery
+ restart-strategy: fixed-delay
+ restart-strategy.fixed-delay.attempts: 10
+ ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+ -Dkubernetes.cluster-id=<ClusterId> \
+ -Dtaskmanager.memory.process.size=4096m \
+ -Dkubernetes.taskmanager.cpu=2 \
+ -Dtaskmanager.numberOfTaskSlots=4 \
+ -Dkubernetes.container.image=<CustomImageName> \
+
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
+ -Dhigh-availability.storageDir=s3://flink/flink-ha \
+ -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+ local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`,
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the
Flink session/application cluster manually(via the following two commands), the
Flink cluster could not be recovered.
Review comment:
```suggestion
Currently, when a Flink job reached the terminal state (`FAILED`,
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the
Flink session/application cluster manually(via the following two commands), the
Flink cluster could not be recovered.
```
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the
classpath or enable the corresponding FileSystem plugin for the Flink image.
Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: flink-config
+ labels:
+ app: flink
+data:
+ flink-conf.yaml: |+
+ ...
+ kubernetes.cluster-id: <ClusterId>
+ high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: hdfs:///flink/recovery
+ restart-strategy: fixed-delay
+ restart-strategy.fixed-delay.attempts: 10
+ ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+ -Dkubernetes.cluster-id=<ClusterId> \
+ -Dtaskmanager.memory.process.size=4096m \
+ -Dkubernetes.taskmanager.cpu=2 \
+ -Dtaskmanager.numberOfTaskSlots=4 \
+ -Dkubernetes.container.image=<CustomImageName> \
+
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
+ -Dhigh-availability.storageDir=s3://flink/flink-ha \
+ -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+ local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`,
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the
Flink session/application cluster manually(via the following two commands), the
Flink cluster could not be recovered.
+
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+{% endhighlight %}
+
+{% highlight bash %}
+$ ./bin/flink cancel -t kubernetes-application
-Dkubernetes.cluster-id=<ClusterID> <JobID>
Review comment:
This will cancel the job and effectively remove all its HA data.
##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
<pre>
$ bin/yarn-session.sh -n 2</pre>
+## Kubernetes Cluster High Availability
+We are using a
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
for the JobManager when deploying Flink on Kubernetes cluster. The `replicas`
is configured to 1, which means that a new JobManager will be launched to take
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({{
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the
classpath or enable the corresponding FileSystem plugin for the Flink image.
Refer to [custom Flink image]({{
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: flink-config
+ labels:
+ app: flink
+data:
+ flink-conf.yaml: |+
+ ...
+ kubernetes.cluster-id: <ClusterId>
+ high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: hdfs:///flink/recovery
+ restart-strategy: fixed-delay
+ restart-strategy.fixed-delay.attempts: 10
+ ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+ -Dkubernetes.cluster-id=<ClusterId> \
+ -Dtaskmanager.memory.process.size=4096m \
+ -Dkubernetes.taskmanager.cpu=2 \
+ -Dtaskmanager.numberOfTaskSlots=4 \
+ -Dkubernetes.container.image=<CustomImageName> \
+
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
+ -Dhigh-availability.storageDir=s3://flink/flink-ha \
+ -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
+ local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`,
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the
Flink session/application cluster manually(via the following two commands), the
Flink cluster could not be recovered.
+
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
Review comment:
Shouldn't this simply shut down the cluster which suspends all running
jobs? Shouldn't this leave the HA state untouched? Or is it because we are
deleting all the HA config maps here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]