XComp commented on a change in pull request #14254:
URL: https://github.com/apache/flink/pull/14254#discussion_r532482467



##########
File path: docs/deployment/ha/index.md
##########
@@ -24,308 +25,63 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+JobManager High Availability (HA) hardens a Flink cluster against JobManager 
failures.
+This feature ensures that a Flink cluster will always continue executing your 
submitted jobs.
+
 * Toc
 {:toc}
 
-## Overview
+## JobManager High Availability
 
-The JobManager coordinates every Flink deployment. It is responsible for both 
*scheduling* and *resource management*.
+The JobManager coordinates every Flink deployment. 
+It is responsible for both *scheduling* and *resource management*.
 
-By default, there is a single JobManager instance per Flink cluster. This 
creates a *single point of failure* (SPOF): if the JobManager crashes, no new 
programs can be submitted and running programs fail.
+By default, there is a single JobManager instance per Flink cluster. 
+This creates a *single point of failure* (SPOF): if the JobManager crashes, no 
new programs can be submitted and running programs fail.
 
-With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. You can configure high availability for both 
**standalone**, **YARN clusters** and **Kubernetes clusters**.
+With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. 
+You can configure high availability for every cluster deployment.
+See [how to configure HA](#configuration) for more information.
 
-See more HA implementation details in [JobManager High 
Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability)
 in the Flink wiki.
+### How to make a cluster highly available
 
-## Standalone Cluster High Availability
-
-The general idea of JobManager high availability for standalone clusters is 
that there is a **single leading JobManager** at any time and **multiple 
standby JobManagers** to take over leadership in case the leader fails. This 
guarantees that there is **no single point of failure** and programs can make 
progress as soon as a standby JobManager has taken leadership. There is no 
explicit distinction between standby and master JobManager instances. Each 
JobManager can take the role of master or standby.
+The general idea of JobManager high availability is that there is a *single 
leading JobManager* at any time and *multiple standby JobManagers* to take over 
leadership in case the leader fails. 

Review comment:
       ```suggestion
   The general idea of JobManager High Availability is that there is a *single 
leading JobManager* at any time and *multiple standby JobManagers* to take over 
leadership in case the leader fails. 
   ```
   Just to be consistent with the subsection above.

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 
+The `high-availability` option has to be set to `zookeeper`.
 
   <pre>high-availability: zookeeper</pre>
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- **ZooKeeper root** (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+  The `storageDir` stores all metadata needed to recover a JobManager failure.

Review comment:
       I would move the storage dir before `ZooKeeper cluster-id` and 
`ZooKeeper root` considering that it's a required option as well. This reduces 
the risk of users skipping over the recommended entries and missing out the 
`Storage directory`.

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 

Review comment:
       What about linking the respective entries in the Configuration page 
instead of coming up with a bold custom key for each item here? It would 
highlight the actual item as well (through the link format) and enable us to 
provide even more information in the configuration page which we might miss 
here.
   
   This comment applies to the other configuration items below as well.

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 
+The `high-availability` option has to be set to `zookeeper`.
 
   <pre>high-availability: zookeeper</pre>
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- **ZooKeeper root** (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-   <pre>
+{% highlight bash %}
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
 
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
 
-   <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
+                                           # with a different service name 
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
+                                           # configured in 
"security.kerberos.login.contexts".
+{% endhighlight %}
 
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please 
refer to the [security section of the Flink configuration page]({% link 
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based 
security internally]({% link deployment/security/security-kerberos.md %}).
 
-   <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions

Review comment:
       ```suggestion
   ## ZooKeeper Versions
   ```

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
[`high-availability.storageDir`]({% link deployment/config.md 
%}#high-availability-storagedir) and only a pointer to this state is stored in 
Kubernetes.

Review comment:
       ```suggestion
   JobManager metadata is persisted in the file system 
[high-availability.storageDir]({% link deployment/config.md 
%}#high-availability-storagedir) and only a pointer to this state is stored in 
Kubernetes.
   ```
   
   I didn't realize that the parameters were actually links due to the 
formatting. We shouldn't overwrite the link formatting like that.

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 

Review comment:
       Here, again: What about using the parameter names as a key for each item 
in this list (analogous to my proposal in the ZooKeeper HA page) and link the 
respective entry on the Configuration page instead of coming up with a custom 
item intro that's bold? Highlighting would work through the link formatting.
   
   This applies to all items in this list.

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
[`high-availability.storageDir`]({% link deployment/config.md 
%}#high-availability-storagedir) and only a pointer to this state is stored in 
Kubernetes.
+
+  <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
+  
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a 
[`kubernetes.cluster-id`]({% link deployment/config.md 
%}#kubernetes-cluster-id).

Review comment:
       ```suggestion
   In order to identify the Flink cluster, you have to specify a 
[kubernetes.cluster-id]({% link deployment/config.md %}#kubernetes-cluster-id).
   ```
   I didn't realize that the parameters were actually links due to the 
formatting. We shouldn't overwrite the link formatting like that.

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
[`high-availability.storageDir`]({% link deployment/config.md 
%}#high-availability-storagedir) and only a pointer to this state is stored in 
Kubernetes.
+
+  <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
+  
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a 
[`kubernetes.cluster-id`]({% link deployment/config.md 
%}#kubernetes-cluster-id).
+
+  <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
 
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
 {% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <ClusterId>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
 {% endhighlight %}
 
+## High availability data clean up
+
 To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <ClusterID>`). 

Review comment:
       ```suggestion
   To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <cluster-id>`). 
   ```
   `ClusterId` reformatting would apply here as well if we want to apply the 
change.

##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 Make sure to call these scripts on the hosts on which you want to start/stop 
the respective instance.
 
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
+
+  <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+  </pre>
+
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:

Review comment:
       ```suggestion
   1. **Configure high availability mode and ZooKeeper quorum** in 
`${FLINK_HOME}/conf/flink-conf.yaml`:
   ```
   What about the proposal of adding a `${*_HOME}` variable to paths to have a 
clearer pointer to the actual file/script.

##########
File path: docs/deployment/resource-providers/standalone/kubernetes.md
##########
@@ -162,6 +162,34 @@ with the `kubectl` command:
     kubectl delete -f jobmanager-job.yaml
 ```
 
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions). 
All other yamls do not need to be updated.

Review comment:
       ```suggestion
   Session Mode, Per-Job Mode, and Application Mode clusters support using the 
Kubernetes high availability service. Users just need to add the following 
Flink config options to 
[flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All 
other yamls do not need to be updated.
   ```

##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 Make sure to call these scripts on the hosts on which you want to start/stop 
the respective instance.
 
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
+
+  <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+  </pre>
+
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+
+   <pre>
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
+high-availability.storageDir: hdfs:///flink/recovery</pre>
+
+2. **Configure masters** in `conf/masters`:
+
+   <pre>
+localhost:8081
+localhost:8082</pre>
+
+3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+
+   <pre>server.0=localhost:2888:3888</pre>
+
+4. **Start ZooKeeper quorum**:
+
+   <pre>
+$ bin/start-zookeeper-quorum.sh
+Starting zookeeper daemon on host localhost.</pre>
+
+5. **Start an HA-cluster**:
+
+   <pre>
+$ bin/start-cluster.sh
+Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
+Starting standalonesession daemon on host localhost.
+Starting standalonesession daemon on host localhost.
+Starting taskexecutor daemon on host localhost.</pre>
+
+6. **Stop ZooKeeper quorum and cluster**:
+
+   <pre>
+$ bin/stop-cluster.sh
+Stopping taskexecutor daemon (pid: 7647) on localhost.
+Stopping standalonesession daemon (pid: 7495) on host localhost.
+Stopping standalonesession daemon (pid: 7349) on host localhost.
+$ bin/stop-zookeeper-quorum.sh
+Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>

Review comment:
       There's also 
   ```
   {% highlight bash %}
   # ...
   {% endhighlight %}
   ```
   which might be the more appropriate syntax to create code blocks.

##########
File path: docs/deployment/resource-providers/standalone/kubernetes.md
##########
@@ -162,6 +162,34 @@ with the `kubectl` command:
     kubectl delete -f jobmanager-job.yaml
 ```
 
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions). 
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <ClusterId>

Review comment:
       ```suggestion
       kubernetes.cluster-id: <cluster-id>
   ```
   `<ClusterId>` needs to be updated if we want to change the formatting 
consistently.

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 
+The `high-availability` option has to be set to `zookeeper`.
 
   <pre>high-availability: zookeeper</pre>
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- **ZooKeeper root** (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-   <pre>
+{% highlight bash %}
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security

Review comment:
       ```suggestion
   ## Configuring for ZooKeeper Security
   ```

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 
+The `high-availability` option has to be set to `zookeeper`.
 
   <pre>high-availability: zookeeper</pre>
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- **ZooKeeper root** (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-   <pre>
+{% highlight bash %}
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
 
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
 
-   <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
+                                           # with a different service name 
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
+                                           # configured in 
"security.kerberos.login.contexts".
+{% endhighlight %}
 
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please 
refer to the [security section of the Flink configuration page]({% link 
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based 
security internally]({% link deployment/security/security-kerberos.md %}).
 
-   <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions
 
-4. **Start ZooKeeper quorum**:
+Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution
+and thus used by default, whereas 3.5 is placed in the `opt` directory.
 
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
+The 3.5 client allows you to secure the Zookeeper connection via SSL, but 
_may_ not work with 3.4- Zookeeper installations.

Review comment:
       ```suggestion
   The 3.5 client allows you to secure the ZooKeeper connection via SSL, but 
_may_ not work with 3.4-ZooKeeper installations.
   ```

##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple 
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+- **high-availability mode** (required): 
+The `high-availability` option has to be set to `zookeeper`.
 
   <pre>high-availability: zookeeper</pre>
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- **ZooKeeper root** (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-   <pre>
+{% highlight bash %}
 high-availability: zookeeper
 high-availability.zookeeper.quorum: localhost:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
 
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
 
-   <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
+                                           # with a different service name 
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
+                                           # configured in 
"security.kerberos.login.contexts".
+{% endhighlight %}
 
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please 
refer to the [security section of the Flink configuration page]({% link 
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based 
security internally]({% link deployment/security/security-kerberos.md %}).
 
-   <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions
 
-4. **Start ZooKeeper quorum**:
+Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution

Review comment:
       ```suggestion
   Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being 
in the `lib` directory of the distribution
   ```

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required): 
+JobManager metadata is persisted in the file system 
[`high-availability.storageDir`]({% link deployment/config.md 
%}#high-availability-storagedir) and only a pointer to this state is stored in 
Kubernetes.
+
+  <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
+  
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a 
[`kubernetes.cluster-id`]({% link deployment/config.md 
%}#kubernetes-cluster-id).
+
+  <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
 
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
 {% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <ClusterId>

Review comment:
       ```suggestion
   kubernetes.cluster-id: <cluster-id>
   ```
   nit: What about changing the formatting of `ClusterId` here to avoid 
misguiding the user considering that only lowercased Strings are allowed.

##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- **high-availability mode** (required): 
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.

Review comment:
       ```suggestion
   The [high-availability]({% link deployment/config.md %}#high-availability-1) 
option has to be set to `KubernetesHaServicesFactory`.
   ```
   I didn't realize that the parameters were actually links due to the 
formatting. We shouldn't overwrite the link formatting like that.

##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \
 
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).

Review comment:
       ```suggestion
   For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/kubernetes_ha.md %}).
   ```
   Wouldn't it make sense to directly link to the actual HA page related to 
Kubernetes here?

##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \
 
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \

Review comment:
       ```suggestion
     -Dkubernetes.cluster-id=<cluster-id> \
   ```
   Here, my proposal of reformatting the cluster-id "variable" applies again.

##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 Make sure to call these scripts on the hosts on which you want to start/stop 
the respective instance.
 
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
+
+  <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+  </pre>
+
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).

Review comment:
       ```suggestion
   By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
[high-availability.jobmanager.port]({% link deployment/config.md 
%}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. 
`50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
   ```
   Removed formatting and added link.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to