sjwiesman commented on a change in pull request #14006:
URL: https://github.com/apache/flink/pull/14006#discussion_r521519370



##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the 
classpath or enable the corresponding FileSystem plugin for the Flink image. 
Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.

Review comment:
       This shouldn't be Hadoop specific. 
   ```suggestion
   <span class="label label-info">Note</span> The filesystem which corresponds 
to the scheme of your configured ha storage directory must be available to the 
runtime. Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
   ```

##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.

Review comment:
       I'm a little confused by this sentence, who is "we" referring to? Does 
this make sense? 
   
   ```suggestion
   When running Flink as a Kubernetes 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/),
 the `replica` count should be configured to 1.
   This means that a new JobManager will be launched to take over leadership if 
the current one terminates exceptionally. 
   ```

##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the 
classpath or enable the corresponding FileSystem plugin for the Flink image. 
Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High Availability Data Clean Up
+Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+
+So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+{% endhighlight %}
+
+The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
+{% highlight bash %}
+# Cancel a Flink job in the existing session
+$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
+# Cancel a Flink application
+$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+{% endhighlight %}
+
+If the user wants to keep the HA data and restart the Flink cluster, he/she 
could also simply delete the deploy(via `kubectl delete deploy <ClusterID>`). 
All the Flink cluster related resources will be destroyed(e.g. JobManager 
Deployment, TaskManager pods, services, Flink conf ConfigMap) so that it will 
not occupy the Kubernetes cluster resources. For the HA related ConfigMaps, we 
do not set the owner reference and they could be retained. Then he/she could 
use `kubernetes-session.sh` or `flink run-application` to start the 
session/application again. All the previous suspending running jobs could 
recover from the latest checkpoint successfully.

Review comment:
       A good rule of thumb, if you ever find yourself writing "he/she" then 
just use "they" instead.
   
   Also, the word "could" implies that something only works some of the time 
but I don't think that's what you mean to say here. 
   
   ```suggestion
   To keep HA data while restarting the Flink cluster,  simply delete the 
deploy (via `kubectl delete deploy <ClusterID>`). All the Flink cluster related 
resources will be destroyed (e.g. JobManager Deployment, TaskManager pods, 
services, Flink conf ConfigMap), as to not occupy the Kubernetes cluster 
resources. However, HA related ConfigMaps do not set the owner reference and 
they will be retained. When restarting the session / application, use 
`kubernetes-session.sh` or `flink run-application`. All the previous suspending 
running jobs will recover from the latest checkpoint successfully.
   ```

##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,74 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.

Review comment:
       ```suggestion
   Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to