tillrohrmann commented on a change in pull request #14006:
URL: https://github.com/apache/flink/pull/14006#discussion_r520462834



##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the 
classpath or enable the corresponding FileSystem plugin for the Flink image. 
Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`, 
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes 
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the 
Flink session/application cluster manually(via the following two commands), the 
Flink cluster could not be recovered.

Review comment:
       ```suggestion
   Currently, when a Flink job reached the terminal state (`FAILED`, 
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes 
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the 
Flink session/application cluster manually(via the following two commands), the 
Flink cluster could not be recovered.
   ```

##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the 
classpath or enable the corresponding FileSystem plugin for the Flink image. 
Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`, 
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes 
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the 
Flink session/application cluster manually(via the following two commands), the 
Flink cluster could not be recovered.
+
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
+{% endhighlight %}
+
+{% highlight bash %}
+$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>

Review comment:
       This will cancel the job and effectively remove all its HA data.

##########
File path: docs/ops/jobmanager_high_availability.md
##########
@@ -215,6 +215,69 @@ Starting zookeeper daemon on host localhost.</pre>
    <pre>
 $ bin/yarn-session.sh -n 2</pre>
 
+## Kubernetes Cluster High Availability
+We are using a 
[deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/)
 for the JobManager when deploying Flink on Kubernetes cluster. The `replicas` 
is configured to 1, which means that a new JobManager will be launched to take 
over the leadership once the current one terminated exceptionally.
+
+### Configuration
+{% highlight yaml %}
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+#### Example: Highly Available Standalone Flink Cluster on Kubernetes
+Both session and job/application clusters could use the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({{ 
site.baseurl}}/ops/deployment/kubernetes.html#common-cluster-resource-definitions).
 All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> You need to have hadoop in the 
classpath or enable the corresponding FileSystem plugin for the Flink image. 
Refer to [custom Flink image]({{ 
site.baseurl}}/ops/deployment/docker.html#customize-flink-image) and [enable 
plugins]({{ site.baseurl}}/ops/deployment/docker.html#using-plugins) for more 
information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+#### Example: Highly Available Native Kubernetes Cluster
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
+### High availability data clean up
+Currently, when a Flink cluster reached the terminal state(`FAILED`, 
`CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes 
ConfigMap and HA storage on DFS, will be cleaned up. So if the user stops the 
Flink session/application cluster manually(via the following two commands), the 
Flink cluster could not be recovered.
+
+{% highlight bash %}
+$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true

Review comment:
       Shouldn't this simply shut down the cluster which suspends all running 
jobs? Shouldn't this leave the HA state untouched? Or is it because we are 
deleting all the HA config maps here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to