This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93bffc063180002b4cba06d1ffc0aaa8045298e0
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Nov 27 18:49:18 2020 +0100

    [FLINK-20357][docs] Split HA documentation up into a general overview and 
the specific implementations
    
    This commit splits the HA documentation up into a general overview and the 
specific implementations:
    * ZooKeeper HA services
    * Kubernetes HA services
    
    Moreover, this commit moves resource-provider specific documentation to the 
respective resource-provider
    documentation.
    
    This closes #14254.
---
 docs/deployment/ha/index.md                        | 320 +++------------------
 docs/deployment/ha/index.zh.md                     | 320 +++------------------
 docs/deployment/ha/kubernetes_ha.md                |  99 +++----
 docs/deployment/ha/kubernetes_ha.zh.md             |  99 +++----
 docs/deployment/ha/zookeeper_ha.md                 | 143 +++++----
 docs/deployment/ha/zookeeper_ha.zh.md              | 143 +++++----
 .../resource-providers/native_kubernetes.md        |  22 ++
 .../resource-providers/native_kubernetes.zh.md     |  22 ++
 .../resource-providers/standalone/index.md         |  67 +++++
 .../resource-providers/standalone/index.zh.md      |  68 +++++
 .../resource-providers/standalone/kubernetes.md    |  28 ++
 .../resource-providers/standalone/kubernetes.zh.md |  29 ++
 12 files changed, 518 insertions(+), 842 deletions(-)

diff --git a/docs/deployment/ha/index.md b/docs/deployment/ha/index.md
index 8e417ce..1f0e321b 100644
--- a/docs/deployment/ha/index.md
+++ b/docs/deployment/ha/index.md
@@ -1,9 +1,10 @@
 ---
-title: "High Availability (HA)"
-nav-id: ha
+title: "JobManager High Availability (HA)"
+nav-title: High Availability (HA)
 nav-parent_id: deployment
-nav-pos: 6
+nav-id: ha
 nav-show_overview: true
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,308 +25,57 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+JobManager High Availability (HA) hardens a Flink cluster against JobManager 
failures.
+This feature ensures that a Flink cluster will always continue executing your 
submitted jobs.
+
 * Toc
 {:toc}
 
-## Overview
-
-The JobManager coordinates every Flink deployment. It is responsible for both 
*scheduling* and *resource management*.
+## JobManager High Availability
 
-By default, there is a single JobManager instance per Flink cluster. This 
creates a *single point of failure* (SPOF): if the JobManager crashes, no new 
programs can be submitted and running programs fail.
+The JobManager coordinates every Flink deployment. 
+It is responsible for both *scheduling* and *resource management*.
 
-With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. You can configure high availability for both 
**standalone**, **YARN clusters** and **Kubernetes clusters**.
+By default, there is a single JobManager instance per Flink cluster. 
+This creates a *single point of failure* (SPOF): if the JobManager crashes, no 
new programs can be submitted and running programs fail.
 
-See more HA implementation details in [JobManager High 
Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability)
 in the Flink wiki.
+With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. 
+You can configure high availability for every cluster deployment.
+See the [list of available high availability 
services](#high-availability-services) for more information.
 
-## Standalone Cluster High Availability
+### How to make a cluster highly available
 
-The general idea of JobManager high availability for standalone clusters is 
that there is a **single leading JobManager** at any time and **multiple 
standby JobManagers** to take over leadership in case the leader fails. This 
guarantees that there is **no single point of failure** and programs can make 
progress as soon as a standby JobManager has taken leadership. There is no 
explicit distinction between standby and master JobManager instances. Each 
JobManager can take the role of master [...]
+The general idea of JobManager High Availability is that there is a *single 
leading JobManager* at any time and *multiple standby JobManagers* to take over 
leadership in case the leader fails. 
+This guarantees that there is *no single point of failure* and programs can 
make progress as soon as a standby JobManager has taken leadership. 
 
 As an example, consider the following setup with three JobManager instances:
 
 <img src="{% link /fig/jobmanager_ha_overview.png %}" class="center" />
 
-### Configuration
-
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
-
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo 
[...]
-
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
-
-  <pre>high-availability: zookeeper</pre>
-
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
-
-  <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
-
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
-
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
-
-  <pre>high-availability.zookeeper.path.root: /flink
-
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
-
-  <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
-
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
-
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
-
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
-
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
-
-#### Example: Standalone Cluster with 2 JobManagers
-
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
-
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.zookeeper.path.root: /flink
-high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
-
-2. **Configure masters** in `conf/masters`:
-
-   <pre>
-localhost:8081
-localhost:8082</pre>
-
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
-
-   <pre>server.0=localhost:2888:3888</pre>
-
-4. **Start ZooKeeper quorum**:
-
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
-
-5. **Start an HA-cluster**:
-
-   <pre>
-$ bin/start-cluster.sh
-Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
-Starting standalonesession daemon on host localhost.
-Starting standalonesession daemon on host localhost.
-Starting taskexecutor daemon on host localhost.</pre>
-
-6. **Stop ZooKeeper quorum and cluster**:
-
-   <pre>
-$ bin/stop-cluster.sh
-Stopping taskexecutor daemon (pid: 7647) on localhost.
-Stopping standalonesession daemon (pid: 7495) on host localhost.
-Stopping standalonesession daemon (pid: 7349) on host localhost.
-$ bin/stop-zookeeper-quorum.sh
-Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
+Flink's [high availability services](#high-availability-services) encapsulate 
the required services to make everything work:
+* **Leader election**: Selecting a single leader out of a pool of `n` 
candidates
+* **Service discovery**: Retrieving the address of the current leader
+* **State persistence**: Persisting state which is required for the successor 
to resume the job execution (JobGraphs, user code jars, completed checkpoints)
 
-## YARN Cluster High Availability
-
-When running a highly available YARN cluster, **we don't run multiple 
JobManager (ApplicationMaster) instances**, but only one, which is restarted by 
YARN on failures. The exact behaviour depends on on the specific YARN version 
you are using.
-
-### Configuration
-
-#### Maximum Application Master Attempts (yarn-site.xml)
-
-You have to configure the maximum number of attempts for the application 
masters for **your** YARN setup in `yarn-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>yarn.resourcemanager.am.max-attempts</name>
-  <value>4</value>
-  <description>
-    The maximum number of application master execution attempts.
-  </description>
-</property>
-{% endhighlight %}
-
-The default for current YARN versions is 2 (meaning a single JobManager 
failure is tolerated).
-
-#### Application Attempts (flink-conf.yaml)
-
-In addition to the HA configuration ([see above](#configuration)), you have to 
configure the maximum attempts in `conf/flink-conf.yaml`:
-
-<pre>yarn.application-attempts: 10</pre>
-
-This means that the application can be restarted 9 times for failed attempts 
before YARN fails the application (9 retries + 1 initial attempt). Additional 
restarts can be performed by YARN if required by YARN operations: Preemption, 
node hardware failures or reboots, or NodeManager resyncs. These restarts are 
not counted against `yarn.application-attempts`, see <a 
href="http://johnjianfang.blogspot.de/2015/04/the-number-of-maximum-attempts-of-yarn.html";>Jian
 Fang's blog post</a>. It's im [...]
-
-#### Container Shutdown Behaviour
-
-- **YARN 2.3.0 < version < 2.4.0**. All containers are restarted if the 
application master fails.
-- **YARN 2.4.0 < version < 2.6.0**. TaskManager containers are kept alive 
across application master failures. This has the advantage that the startup 
time is faster and that the user does not have to wait for obtaining the 
container resources again.
-- **YARN 2.6.0 <= version**: Sets the attempt failure validity interval to the 
Flinks' Akka timeout value. The attempt failure validity interval says that an 
application is only killed after the system has seen the maximum number of 
application attempts during one interval. This avoids that a long lasting job 
will deplete it's application attempts.
-
-<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: 
Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container 
restarts from a restarted Application Master/Job Manager container. See <a 
href="https://issues.apache.org/jira/browse/FLINK-4142";>FLINK-4142</a> for 
details. We recommend using at least Hadoop 2.5.0 for high availability setups 
on YARN.</p>
-
-#### Example: Highly Available YARN Session
-
-1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
-
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.storageDir: hdfs:///flink/recovery
-high-availability.zookeeper.path.root: /flink
-yarn.application-attempts: 10</pre>
-
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
-
-   <pre>server.0=localhost:2888:3888</pre>
-
-4. **Start ZooKeeper quorum**:
-
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
-
-5. **Start an HA-cluster**:
-
-   <pre>
-$ bin/yarn-session.sh -n 2</pre>
-
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
-
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
-
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
-
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
-
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
-
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
-
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
-
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
-{% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
-{% endhighlight %}
-
-To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <ClusterID>`). 
-All the Flink cluster related resources will be deleted (e.g. JobManager 
Deployment, TaskManager pods, services, Flink conf ConfigMap). 
-HA related ConfigMaps will be retained because they do not set the owner 
reference. 
-When restarting the session / application using `kubernetes-session.sh` or 
`flink run-application`,  all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
-
-## Configuring for Zookeeper Security
-
-If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
-
-<pre>
-zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
-                                           # with a different service name 
then it can be supplied here.
-zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
-                                           # configured in 
"security.kerberos.login.contexts".
-</pre>
-
-For more information on Flink configuration for Kerberos security, please see 
[here]({% link deployment/config.md %}).
-You can also find [here]({% link deployment/security/security-kerberos.md %}) 
further details on how Flink internally setups Kerberos-based security.
-
-## Zookeeper Versions
-
-Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution
-and thus used by default, whereas 3.5 is placed in the `opt` directory.
+{% top %}
 
-The 3.5 client allows you to secure the Zookeeper connection via SSL, but 
_may_ not work with 3.4- Zookeeper installations.
+## High Availability Services
 
-You can control which version is used by Flink by placing either jar in the 
`lib` directory.
+Flink ships with two high availability service implementations:
 
-## Bootstrap ZooKeeper
+* [ZooKeeper]({% link deployment/ha/zookeeper_ha.md %}): 
+ZooKeeper HA services can be used with every Flink cluster deployment. 
+They require a running ZooKeeper quorum.  
 
-If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
+* [Kubernetes]({% link deployment/ha/kubernetes_ha.md %}):
+Kubernetes HA services only work when running on Kubernetes.
 
-There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can 
configure the hosts to run ZooKeeper on with the `server.X` entries, where X is 
a unique ID of each server:
+{% top %}
 
-<pre>
-server.X=addressX:peerPort:leaderPort
-[...]
-server.Y=addressY:peerPort:leaderPort
-</pre>
+## High Availability data lifecycle
 
-The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on 
each of the configured hosts. The started processes start ZooKeeper servers via 
a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes 
sure to set some required configuration values for convenience. In production 
setups, it is recommended to manage your own ZooKeeper installation.
+In order to recover submitted jobs, Flink persists metadata and the job 
artifacts.
+The HA data will be kept until the respective job either succeeds, is 
cancelled or fails terminally.
+Once this happens, all the HA data, including the metadata stored in the HA 
services, will be deleted.  
 
 {% top %}
diff --git a/docs/deployment/ha/index.zh.md b/docs/deployment/ha/index.zh.md
index 908ef84..5133012 100644
--- a/docs/deployment/ha/index.zh.md
+++ b/docs/deployment/ha/index.zh.md
@@ -1,9 +1,10 @@
 ---
-title: "High Availability (HA)"
-nav-id: ha
+title: "JobManager High Availability (HA)"
+nav-title: High Availability (HA)
 nav-parent_id: deployment
-nav-pos: 6
+nav-id: ha
 nav-show_overview: true
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,308 +25,53 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+JobManager High Availability (HA) hardens a Flink cluster against JobManager 
failures.
+This feature ensures that a Flink cluster will always continue executing your 
submitted jobs.
+
 * Toc
 {:toc}
 
-## Overview
-
-The JobManager coordinates every Flink deployment. It is responsible for both 
*scheduling* and *resource management*.
+## JobManager High Availability
 
-By default, there is a single JobManager instance per Flink cluster. This 
creates a *single point of failure* (SPOF): if the JobManager crashes, no new 
programs can be submitted and running programs fail.
+The JobManager coordinates every Flink deployment. 
+It is responsible for both *scheduling* and *resource management*.
 
-With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. You can configure high availability for both 
**standalone**, **YARN clusters** and **Kubernetes clusters**.
+By default, there is a single JobManager instance per Flink cluster. 
+This creates a *single point of failure* (SPOF): if the JobManager crashes, no 
new programs can be submitted and running programs fail.
 
-See more HA implementation details in [JobManager High 
Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability)
 in the Flink wiki.
+With JobManager High Availability, you can recover from JobManager failures 
and thereby eliminate the *SPOF*. 
+You can configure high availability for every cluster deployment.
+See the [list of available high availability 
services](#high-availability-services) for more information.
 
-## Standalone Cluster High Availability
+### How to make a cluster highly available
 
-The general idea of JobManager high availability for standalone clusters is 
that there is a **single leading JobManager** at any time and **multiple 
standby JobManagers** to take over leadership in case the leader fails. This 
guarantees that there is **no single point of failure** and programs can make 
progress as soon as a standby JobManager has taken leadership. There is no 
explicit distinction between standby and master JobManager instances. Each 
JobManager can take the role of master [...]
+The general idea of JobManager High Availability is that there is a *single 
leading JobManager* at any time and *multiple standby JobManagers* to take over 
leadership in case the leader fails. 
+This guarantees that there is *no single point of failure* and programs can 
make progress as soon as a standby JobManager has taken leadership. 
 
 As an example, consider the following setup with three JobManager instances:
 
 <img src="{% link /fig/jobmanager_ha_overview.png %}" class="center" />
 
-### Configuration
-
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
-
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo 
[...]
-
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance.
-
-  <pre>high-availability: zookeeper</pre>
-
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
-
-  <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
-
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
-
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
-
-  <pre>high-availability.zookeeper.path.root: /flink
-
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
-
-  <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
-
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
-
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
-
-    <pre>
-high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
-
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
-
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
-
-#### Example: Standalone Cluster with 2 JobManagers
-
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
-
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.zookeeper.path.root: /flink
-high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
-
-2. **Configure masters** in `conf/masters`:
-
-   <pre>
-localhost:8081
-localhost:8082</pre>
-
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
-
-   <pre>server.0=localhost:2888:3888</pre>
-
-4. **Start ZooKeeper quorum**:
-
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
-
-5. **Start an HA-cluster**:
-
-   <pre>
-$ bin/start-cluster.sh
-Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
-Starting standalonesession daemon on host localhost.
-Starting standalonesession daemon on host localhost.
-Starting taskexecutor daemon on host localhost.</pre>
-
-6. **Stop ZooKeeper quorum and cluster**:
-
-   <pre>
-$ bin/stop-cluster.sh
-Stopping taskexecutor daemon (pid: 7647) on localhost.
-Stopping standalonesession daemon (pid: 7495) on host localhost.
-Stopping standalonesession daemon (pid: 7349) on host localhost.
-$ bin/stop-zookeeper-quorum.sh
-Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
-
-## YARN Cluster High Availability
-
-When running a highly available YARN cluster, **we don't run multiple 
JobManager (ApplicationMaster) instances**, but only one, which is restarted by 
YARN on failures. The exact behaviour depends on on the specific YARN version 
you are using.
-
-### Configuration
-
-#### Maximum Application Master Attempts (yarn-site.xml)
-
-You have to configure the maximum number of attempts for the application 
masters for **your** YARN setup in `yarn-site.xml`:
-
-{% highlight xml %}
-<property>
-  <name>yarn.resourcemanager.am.max-attempts</name>
-  <value>4</value>
-  <description>
-    The maximum number of application master execution attempts.
-  </description>
-</property>
-{% endhighlight %}
-
-The default for current YARN versions is 2 (meaning a single JobManager 
failure is tolerated).
-
-#### Application Attempts (flink-conf.yaml)
-
-In addition to the HA configuration ([see above](#configuration)), you have to 
configure the maximum attempts in `conf/flink-conf.yaml`:
-
-<pre>yarn.application-attempts: 10</pre>
-
-This means that the application can be restarted 9 times for failed attempts 
before YARN fails the application (9 retries + 1 initial attempt). Additional 
restarts can be performed by YARN if required by YARN operations: Preemption, 
node hardware failures or reboots, or NodeManager resyncs. These restarts are 
not counted against `yarn.application-attempts`, see <a 
href="http://johnjianfang.blogspot.de/2015/04/the-number-of-maximum-attempts-of-yarn.html";>Jian
 Fang's blog post</a>. It's im [...]
-
-#### Container Shutdown Behaviour
-
-- **YARN 2.3.0 < version < 2.4.0**. All containers are restarted if the 
application master fails.
-- **YARN 2.4.0 < version < 2.6.0**. TaskManager containers are kept alive 
across application master failures. This has the advantage that the startup 
time is faster and that the user does not have to wait for obtaining the 
container resources again.
-- **YARN 2.6.0 <= version**: Sets the attempt failure validity interval to the 
Flinks' Akka timeout value. The attempt failure validity interval says that an 
application is only killed after the system has seen the maximum number of 
application attempts during one interval. This avoids that a long lasting job 
will deplete it's application attempts.
-
-<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: 
Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container 
restarts from a restarted Application Master/Job Manager container. See <a 
href="https://issues.apache.org/jira/browse/FLINK-4142";>FLINK-4142</a> for 
details. We recommend using at least Hadoop 2.5.0 for high availability setups 
on YARN.</p>
-
-#### Example: Highly Available YARN Session
-
-1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`:
-
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.storageDir: hdfs:///flink/recovery
-high-availability.zookeeper.path.root: /flink
-yarn.application-attempts: 10</pre>
-
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
-
-   <pre>server.0=localhost:2888:3888</pre>
-
-4. **Start ZooKeeper quorum**:
-
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
-
-5. **Start an HA-cluster**:
-
-   <pre>
-$ bin/yarn-session.sh -n 2</pre>
-
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({%link deployment/resource-providers/standalone/kubernetes.zh.md 
%}) and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.zh.md %}).
-
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
-
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
-
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.zh.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) 
and [enable plugins]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for 
more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
-
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
-
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
-
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
-
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
-{% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
-{% endhighlight %}
-
-To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <ClusterID>`). 
-All the Flink cluster related resources will be deleted (e.g. JobManager 
Deployment, TaskManager pods, services, Flink conf ConfigMap). 
-HA related ConfigMaps will be retained because they do not set the owner 
reference. 
-When restarting the session / application using `kubernetes-session.sh` or 
`flink run-application`,  all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
-
-## Configuring for Zookeeper Security
-
-If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
-
-<pre>
-zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
-                                           # with a different service name 
then it can be supplied here.
-zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
-                                           # configured in 
"security.kerberos.login.contexts".
-</pre>
-
-For more information on Flink configuration for Kerberos security, please see 
[here]({% link deployment/config.zh.md %}).
-You can also find [here]({% link deployment/security/security-kerberos.zh.md 
%}) further details on how Flink internally setups Kerberos-based security.
-
-## Zookeeper Versions
-
-Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution
-and thus used by default, whereas 3.5 is placed in the `opt` directory.
-
-The 3.5 client allows you to secure the Zookeeper connection via SSL, but 
_may_ not work with 3.4- Zookeeper installations.
+Flink's [high availability services](#high-availability-services) encapsulate 
the required services to make everything work:
+* **Leader election**: Selecting a single leader out of a pool of `n` 
candidates
+* **Service discovery**: Retrieving the address of the current leader
+* **State persistence**: Persisting state which is required for the successor 
to resume the job execution (JobGraphs, user code jars, completed checkpoints)
 
-You can control which version is used by Flink by placing either jar in the 
`lib` directory.
+## High Availability Services
 
-## Bootstrap ZooKeeper
+Flink ships with two high availability service implementations:
 
-If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
+* [ZooKeeper]({% link deployment/ha/zookeeper_ha.zh.md %}): 
+ZooKeeper HA services can be used with every Flink cluster deployment. 
+They require a running ZooKeeper quorum.  
 
-There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can 
configure the hosts to run ZooKeeper on with the `server.X` entries, where X is 
a unique ID of each server:
+* [Kubernetes]({% link deployment/ha/kubernetes_ha.zh.md %}):
+Kubernetes HA services only work when running on Kubernetes.
 
-<pre>
-server.X=addressX:peerPort:leaderPort
-[...]
-server.Y=addressY:peerPort:leaderPort
-</pre>
+## High Availability data lifecycle
 
-The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on 
each of the configured hosts. The started processes start ZooKeeper servers via 
a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes 
sure to set some required configuration values for convenience. In production 
setups, it is recommended to manage your own ZooKeeper installation.
+In order to recover submitted jobs, Flink persists metadata and the job 
artifacts.
+The HA data will be kept until the respective job either succeeds, is 
cancelled or fails terminally.
+Once this happens, all the HA data, including the metadata stored in the HA 
services, will be deleted.  
 
 {% top %}
diff --git a/docs/deployment/ha/kubernetes_ha.md 
b/docs/deployment/ha/kubernetes_ha.md
index f2226ce..c553a1a 100644
--- a/docs/deployment/ha/kubernetes_ha.md
+++ b/docs/deployment/ha/kubernetes_ha.md
@@ -23,77 +23,52 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) 
or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- [high-availability]({% link deployment/config.md %}#high-availability-1) 
(required): 
+The `high-availability` option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- [high-availability.storageDir]({% link deployment/config.md 
%}#high-availability-storagedir) (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
Kubernetes.
+
+  <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
+  
+- [kubernetes.cluster-id]({% link deployment/config.md 
%}#kubernetes-cluster-id) (required):
+In order to identify the Flink cluster, you have to specify a 
`kubernetes.cluster-id`.
+
+  <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
 
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
 {% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <cluster-id>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
 {% endhighlight %}
 
-To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <ClusterID>`). 
+{% top %}
+
+## High availability data clean up
+
+To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <cluster-id>`). 
 All the Flink cluster related resources will be deleted (e.g. JobManager 
Deployment, TaskManager pods, services, Flink conf ConfigMap). 
 HA related ConfigMaps will be retained because they do not set the owner 
reference. 
-When restarting the session / application using `kubernetes-session.sh` or 
`flink run-application`,  all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
+When restarting the cluster, all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
+
+{% top %} 
diff --git a/docs/deployment/ha/kubernetes_ha.zh.md 
b/docs/deployment/ha/kubernetes_ha.zh.md
index 2618db6..4ff8356 100644
--- a/docs/deployment/ha/kubernetes_ha.zh.md
+++ b/docs/deployment/ha/kubernetes_ha.zh.md
@@ -23,77 +23,52 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md 
%}) and [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.zh.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for 
high availability services.
 
-When running Flink JobManager as a Kubernetes deployment, the replica count 
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over 
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be 
launched simultaneously while one is active and others are standby. Starting 
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
 
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to 
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on 
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md 
%}) or the [native Kubernetes integration]({% link 
deployment/resource-providers/native_kubernetes.zh.md %})
 
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high 
availability service. Users just need to add the following Flink config options 
to [flink-configuration-configmap.yaml]({% link 
deployment/resource-providers/standalone/kubernetes.zh.md 
%}#common-cluster-resource-definitions). All other yamls do not need to be 
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) 
and [enable plugins]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for 
more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
-  name: flink-config
-  labels:
-    app: flink
-data:
-  flink-conf.yaml: |+
-  ...
-    kubernetes.cluster-id: <ClusterId>
-    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-    high-availability.storageDir: hdfs:///flink/recovery
-    restart-strategy: fixed-delay
-    restart-strategy.fixed-delay.attempts: 10
-  ...
-{% endhighlight %}
+## Configuration
 
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
-  -Dkubernetes.cluster-id=<ClusterId> \
-  -Dtaskmanager.memory.process.size=4096m \
-  -Dkubernetes.taskmanager.cpu=2 \
-  -Dtaskmanager.numberOfTaskSlots=4 \
-  -Dkubernetes.container.image=<CustomImageName> \
-  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-  -Dhigh-availability.storageDir=s3://flink/flink-ha \
-  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
-  local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, 
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA 
state on DFS, will be cleaned up.
+- [high-availability]({% link deployment/config.zh.md %}#high-availability-1) 
(required): 
+The `high-availability` option has to be set to `KubernetesHaServicesFactory`.
 
-So the following command will only shut down the Flink session cluster and 
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh 
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+  <pre>high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- [high-availability.storageDir]({% link deployment/config.zh.md 
%}#high-availability-storagedir) (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
Kubernetes.
+
+  <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
+  
+- [kubernetes.cluster-id]({% link deployment/config.zh.md 
%}#kubernetes-cluster-id) (required):
+In order to identify the Flink cluster, you have to specify a 
`kubernetes.cluster-id`.
+
+  <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
 
-The following commands will cancel the job in application or session cluster 
and effectively remove all its HA data.
 {% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> 
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application 
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <cluster-id>
+high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
 {% endhighlight %}
 
-To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <ClusterID>`). 
+{% top %}
+
+## High availability data clean up
+
+To keep HA data while restarting the Flink cluster, simply delete the 
deployment (via `kubectl delete deploy <cluster-id>`). 
 All the Flink cluster related resources will be deleted (e.g. JobManager 
Deployment, TaskManager pods, services, Flink conf ConfigMap). 
 HA related ConfigMaps will be retained because they do not set the owner 
reference. 
-When restarting the session / application using `kubernetes-session.sh` or 
`flink run-application`,  all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
+When restarting the cluster, all previously running jobs will be recovered and 
restarted from the latest successful checkpoint.
+
+{% top %} 
diff --git a/docs/deployment/ha/zookeeper_ha.md 
b/docs/deployment/ha/zookeeper_ha.md
index d1faf2d..5371d81 100644
--- a/docs/deployment/ha/zookeeper_ha.md
+++ b/docs/deployment/ha/zookeeper_ha.md
@@ -23,113 +23,110 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo 
[...]
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
+- [high-availability]({% link deployment/config.md %}#high-availability-1) 
(required): 
+The `high-availability` option has to be set to `zookeeper`.
 
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
+  <pre>high-availability: zookeeper</pre>
 
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
+- [high-availability.storageDir]({% link deployment/config.md 
%}#high-availability-storagedir) (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-  <pre>high-availability: zookeeper</pre>
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- [high-availability.zookeeper.quorum]({%link deployment/config.md 
%}#high-availability-zookeeper-quorum) (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- [high-availability.zookeeper.path.root]({% link deployment/config.md 
%}#high-availability-zookeeper-path-root) (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- [high-availability.cluster-id]({% link deployment/config.md 
%}#high-availability-cluster-id) (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
+
+### Example configuration
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-    <pre>
+{% highlight bash %}
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
 high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
+{% endhighlight %}
 
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+{% top %}
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+## Configuring for ZooKeeper Security
 
-#### Example: Standalone Cluster with 2 JobManagers
+If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
+                                           # with a different service name 
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
+                                           # configured in 
"security.kerberos.login.contexts".
+{% endhighlight %}
 
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.zookeeper.path.root: /flink
-high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+For more information on Flink configuration for Kerberos security, please 
refer to the [security section of the Flink configuration page]({% link 
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based 
security internally]({% link deployment/security/security-kerberos.md %}).
+
+{% top %}
 
-2. **Configure masters** in `conf/masters`:
+## ZooKeeper Versions
 
-   <pre>
-localhost:8081
-localhost:8082</pre>
+Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution
+and thus used by default, whereas 3.5 is placed in the `opt` directory.
 
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+The 3.5 client allows you to secure the ZooKeeper connection via SSL, but 
_may_ not work with 3.4- ZooKeeper installations.
 
-   <pre>server.0=localhost:2888:3888</pre>
+You can control which version is used by Flink by placing either jar in the 
`lib` directory.
 
-4. **Start ZooKeeper quorum**:
+{% top %}
 
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
+## Bootstrap ZooKeeper
 
-5. **Start an HA-cluster**:
+If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
 
-   <pre>
-$ bin/start-cluster.sh
-Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
-Starting standalonesession daemon on host localhost.
-Starting standalonesession daemon on host localhost.
-Starting taskexecutor daemon on host localhost.</pre>
+There is a ZooKeeper configuration template in `conf/zoo.cfg`. 
+You can configure the hosts to run ZooKeeper on with the `server.X` entries, 
where X is a unique ID of each server:
+
+{% highlight bash %}
+server.X=addressX:peerPort:leaderPort
+[...]
+server.Y=addressY:peerPort:leaderPort
+{% endhighlight %}
 
-6. **Stop ZooKeeper quorum and cluster**:
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on 
each of the configured hosts. 
+The started processes start ZooKeeper servers via a Flink wrapper, which reads 
the configuration from `conf/zoo.cfg` and makes sure to set some required 
configuration values for convenience. 
+In production setups, it is recommended to manage your own ZooKeeper 
installation.
 
-   <pre>
-$ bin/stop-cluster.sh
-Stopping taskexecutor daemon (pid: 7647) on localhost.
-Stopping standalonesession daemon (pid: 7495) on host localhost.
-Stopping standalonesession daemon (pid: 7349) on host localhost.
-$ bin/stop-zookeeper-quorum.sh
-Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
+{% top %} 
diff --git a/docs/deployment/ha/zookeeper_ha.zh.md 
b/docs/deployment/ha/zookeeper_ha.zh.md
index d1faf2d..313634b 100644
--- a/docs/deployment/ha/zookeeper_ha.zh.md
+++ b/docs/deployment/ha/zookeeper_ha.zh.md
@@ -23,113 +23,110 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for 
high availability services.
 
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
 
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. 
+ZooKeeper is a separate service from Flink, which provides highly reliable 
distributed coordination via leader election and light-weight consistent state 
storage. 
+Check out [ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. 
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) 
installation.
 
-To enable JobManager High Availability you have to set the **high-availability 
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters 
file** with all JobManagers hosts and their web UI ports.
+## Configuration
 
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed 
coordination* between all running JobManager instances. ZooKeeper is a separate 
service from Flink, which provides highly reliable distributed coordination via 
leader election and light-weight consistent state storage. Check out 
[ZooKeeper's Getting Started 
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more 
information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo 
[...]
+In order to start an HA-cluster you have to configure the following 
configuration keys:
 
-#### Masters File (masters)
+- [high-availability]({% link deployment/config.zh.md %}#high-availability-1) 
(required): 
+The `high-availability` option has to be set to `zookeeper`.
 
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
-
-  <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
-  </pre>
-
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
**`high-availability.jobmanager.port`** key. This key accepts single ports 
(e.g. `50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
+  <pre>high-availability: zookeeper</pre>
 
-In order to start an HA-cluster add the following configuration keys to 
`conf/flink-conf.yaml`:
+- [high-availability.storageDir]({% link deployment/config.zh.md 
%}#high-availability-storagedir) (required): 
+JobManager metadata is persisted in the file system 
`high-availability.storageDir` and only a pointer to this state is stored in 
ZooKeeper.
 
-- **high-availability mode** (required): The *high-availability mode* has to 
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high 
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use 
to create HighAvailabilityServices instance. 
+  <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
 
-  <pre>high-availability: zookeeper</pre>
+  The `storageDir` stores all metadata needed to recover a JobManager failure.
 
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group 
of ZooKeeper servers, which provide the distributed coordination service.
+- [high-availability.zookeeper.quorum]({%link deployment/config.zh.md 
%}#high-availability-zookeeper-quorum) (required): 
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide 
the distributed coordination service.
 
   <pre>high-availability.zookeeper.quorum: 
address1:2181[,...],addressX:2181</pre>
 
-  Each *addressX:port* refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
+  Each `addressX:port` refers to a ZooKeeper server, which is reachable by 
Flink at the given address and port.
 
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all 
cluster nodes are placed.
+- [high-availability.zookeeper.path.root]({% link deployment/config.zh.md 
%}#high-availability-zookeeper-path-root) (recommended): 
+The *root ZooKeeper node*, under which all cluster nodes are placed.
 
-  <pre>high-availability.zookeeper.path.root: /flink
+  <pre>high-availability.zookeeper.path.root: /flink</pre>
 
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, 
under which all required coordination data for a cluster is placed.
+- [high-availability.cluster-id]({% link deployment/config.zh.md 
%}#high-availability-cluster-id) (recommended): 
+The *cluster-id ZooKeeper node*, under which all required coordination data 
for a cluster is placed.
 
   <pre>high-availability.cluster-id: /default_ns # important: customize per 
cluster</pre>
 
-  **Important**: You should not set this value manually when running a YARN
-  cluster, a per-job YARN session, or on another cluster manager. In those
-  cases a cluster-id is automatically being generated based on the application
-  id. Manually setting a cluster-id overrides this behaviour in YARN.
-  Specifying a cluster-id with the -z CLI option, in turn, overrides manual
-  configuration. If you are running multiple Flink HA clusters on bare metal,
-  you have to manually configure separate cluster-ids for each cluster.
+  **Important**: 
+  You should not set this value manually when running on YARN, native 
Kubernetes or on another cluster manager. 
+  In those cases a cluster-id is being automatically generated. 
+  If you are running multiple Flink HA clusters on bare metal, you have to 
manually configure separate cluster-ids for each cluster.
+
+### Example configuration
 
-- **Storage directory** (required): JobManager metadata is persisted in the 
file system *storageDir* and only a pointer to this state is stored in 
ZooKeeper.
+Configure high availability mode and ZooKeeper quorum in 
`conf/flink-conf.yaml`:
 
-    <pre>
+{% highlight bash %}
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
 high-availability.storageDir: hdfs:///flink/recovery
-    </pre>
+{% endhighlight %}
 
-    The `storageDir` stores all metadata needed to recover a JobManager 
failure.
+{% top %}
 
-After configuring the masters and the ZooKeeper quorum, you can use the 
provided cluster startup scripts as usual. They will start an HA-cluster. Keep 
in mind that the **ZooKeeper quorum has to be running** when you call the 
scripts and make sure to **configure a separate ZooKeeper root path** for each 
HA cluster you are starting.
+## Configuring for ZooKeeper Security
 
-#### Example: Standalone Cluster with 2 JobManagers
+If ZooKeeper is running in secure mode with Kerberos, you can override the 
following configurations in `flink-conf.yaml` as necessary:
 
-1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper     # default is "zookeeper". If the 
ZooKeeper quorum is configured
+                                           # with a different service name 
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client  # default is "Client". The value 
needs to match one of the values
+                                           # configured in 
"security.kerberos.login.contexts".
+{% endhighlight %}
 
-   <pre>
-high-availability: zookeeper
-high-availability.zookeeper.quorum: localhost:2181
-high-availability.zookeeper.path.root: /flink
-high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+For more information on Flink configuration for Kerberos security, please 
refer to the [security section of the Flink configuration page]({% link 
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based 
security internally]({% link deployment/security/security-kerberos.md %}).
+
+{% top %}
 
-2. **Configure masters** in `conf/masters`:
+## ZooKeeper Versions
 
-   <pre>
-localhost:8081
-localhost:8082</pre>
+Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being in 
the `lib` directory of the distribution
+and thus used by default, whereas 3.5 is placed in the `opt` directory.
 
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+The 3.5 client allows you to secure the ZooKeeper connection via SSL, but 
_may_ not work with 3.4- ZooKeeper installations.
 
-   <pre>server.0=localhost:2888:3888</pre>
+You can control which version is used by Flink by placing either jar in the 
`lib` directory.
 
-4. **Start ZooKeeper quorum**:
+{% top %}
 
-   <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
+## Bootstrap ZooKeeper
 
-5. **Start an HA-cluster**:
+If you don't have a running ZooKeeper installation, you can use the helper 
scripts, which ship with Flink.
 
-   <pre>
-$ bin/start-cluster.sh
-Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
-Starting standalonesession daemon on host localhost.
-Starting standalonesession daemon on host localhost.
-Starting taskexecutor daemon on host localhost.</pre>
+There is a ZooKeeper configuration template in `conf/zoo.cfg`. 
+You can configure the hosts to run ZooKeeper on with the `server.X` entries, 
where X is a unique ID of each server:
+
+{% highlight bash %}
+server.X=addressX:peerPort:leaderPort
+[...]
+server.Y=addressY:peerPort:leaderPort
+{% endhighlight %}
 
-6. **Stop ZooKeeper quorum and cluster**:
+The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on 
each of the configured hosts. 
+The started processes start ZooKeeper servers via a Flink wrapper, which reads 
the configuration from `conf/zoo.cfg` and makes sure to set some required 
configuration values for convenience. 
+In production setups, it is recommended to manage your own ZooKeeper 
installation.
 
-   <pre>
-$ bin/stop-cluster.sh
-Stopping taskexecutor daemon (pid: 7647) on localhost.
-Stopping standalonesession daemon (pid: 7495) on host localhost.
-Stopping standalonesession daemon (pid: 7349) on host localhost.
-$ bin/stop-zookeeper-quorum.sh
-Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
+{% top %} 
diff --git a/docs/deployment/resource-providers/native_kubernetes.md 
b/docs/deployment/resource-providers/native_kubernetes.md
index 1d0d7c8..5d28000 100644
--- a/docs/deployment/resource-providers/native_kubernetes.md
+++ b/docs/deployment/resource-providers/native_kubernetes.md
@@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \
 
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
 ## Kubernetes concepts
 
 ### Namespaces
diff --git a/docs/deployment/resource-providers/native_kubernetes.zh.md 
b/docs/deployment/resource-providers/native_kubernetes.zh.md
index 309ac5c..d28b4c9 100644
--- a/docs/deployment/resource-providers/native_kubernetes.zh.md
+++ b/docs/deployment/resource-providers/native_kubernetes.zh.md
@@ -374,6 +374,28 @@ $ ./bin/kubernetes-session.sh \
 
 For more details see the [official Kubernetes 
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
 
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.zh.md %}).
+
+### How to configure Kubernetes HA Services
+
+Using the following command to start a native Flink application cluster on 
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+  -Dkubernetes.cluster-id=<ClusterId> \
+  -Dtaskmanager.memory.process.size=4096m \
+  -Dkubernetes.taskmanager.cpu=2 \
+  -Dtaskmanager.numberOfTaskSlots=4 \
+  -Dkubernetes.container.image=<CustomImageName> \
+  
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
+  -Dhigh-availability.storageDir=s3://flink/flink-ha \
+  -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
+  
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
 \
+  local:///opt/flink/examples/streaming/StateMachineExample.jar
+{% endhighlight %}
+
 ## Kubernetes 概念
 
 ### 命名空间
diff --git a/docs/deployment/resource-providers/standalone/index.md 
b/docs/deployment/resource-providers/standalone/index.md
index 4fa253c..ba09d31 100644
--- a/docs/deployment/resource-providers/standalone/index.md
+++ b/docs/deployment/resource-providers/standalone/index.md
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 Make sure to call these scripts on the hosts on which you want to start/stop 
the respective instance.
 
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
+
+  <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+  </pre>
+
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
[high-availability.jobmanager.port]({% link deployment/config.md 
%}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. 
`50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+
+   <pre>
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
+high-availability.storageDir: hdfs:///flink/recovery</pre>
+
+2. **Configure masters** in `conf/masters`:
+
+   <pre>
+localhost:8081
+localhost:8082</pre>
+
+3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+
+   <pre>server.0=localhost:2888:3888</pre>
+
+4. **Start ZooKeeper quorum**:
+
+   <pre>
+$ bin/start-zookeeper-quorum.sh
+Starting zookeeper daemon on host localhost.</pre>
+
+5. **Start an HA-cluster**:
+
+   <pre>
+$ bin/start-cluster.sh
+Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
+Starting standalonesession daemon on host localhost.
+Starting standalonesession daemon on host localhost.
+Starting taskexecutor daemon on host localhost.</pre>
+
+6. **Stop ZooKeeper quorum and cluster**:
+
+   <pre>
+$ bin/stop-cluster.sh
+Stopping taskexecutor daemon (pid: 7647) on localhost.
+Stopping standalonesession daemon (pid: 7495) on host localhost.
+Stopping standalonesession daemon (pid: 7349) on host localhost.
+$ bin/stop-zookeeper-quorum.sh
+Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
+
+
 {% top %}
diff --git a/docs/deployment/resource-providers/standalone/index.zh.md 
b/docs/deployment/resource-providers/standalone/index.zh.md
index 743c738..02f3718 100644
--- a/docs/deployment/resource-providers/standalone/index.zh.md
+++ b/docs/deployment/resource-providers/standalone/index.zh.md
@@ -163,4 +163,72 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 确保在你想启动/关闭相应实例的主机上执行这些脚本。
 
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({% link deployment/ha/zookeeper_ha.zh.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which 
JobManagers are started, and the ports to which the web user interface binds.
+
+  <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+  </pre>
+
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
[high-availability.jobmanager.port]({% link deployment/config.md 
%}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. 
`50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in 
`conf/flink-conf.yaml`:
+
+   <pre>
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
+high-availability.storageDir: hdfs:///flink/recovery</pre>
+
+2. **Configure masters** in `conf/masters`:
+
+   <pre>
+localhost:8081
+localhost:8082</pre>
+
+3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only 
possible to run a single ZooKeeper server per machine):
+
+   <pre>server.0=localhost:2888:3888</pre>
+
+4. **Start ZooKeeper quorum**:
+
+   <pre>
+$ bin/start-zookeeper-quorum.sh
+Starting zookeeper daemon on host localhost.</pre>
+
+5. **Start an HA-cluster**:
+
+   <pre>
+$ bin/start-cluster.sh
+Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
+Starting standalonesession daemon on host localhost.
+Starting standalonesession daemon on host localhost.
+Starting taskexecutor daemon on host localhost.</pre>
+
+6. **Stop ZooKeeper quorum and cluster**:
+
+   <pre>
+$ bin/stop-cluster.sh
+Stopping taskexecutor daemon (pid: 7647) on localhost.
+Stopping standalonesession daemon (pid: 7495) on host localhost.
+Stopping standalonesession daemon (pid: 7349) on host localhost.
+$ bin/stop-zookeeper-quorum.sh
+Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
+
+
+
 {% top %}
diff --git a/docs/deployment/resource-providers/standalone/kubernetes.md 
b/docs/deployment/resource-providers/standalone/kubernetes.md
index bd1b9a3..22bb1f2 100644
--- a/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -162,6 +162,34 @@ with the `kubectl` command:
     kubectl delete -f jobmanager-job.yaml
 ```
 
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Session Mode, Per-Job Mode, and Application Mode clusters support using the 
Kubernetes high availability service. Users just need to add the following 
Flink config options to 
[flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All 
other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.md %}#customize-flink-image) 
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md 
%}#using-plugins) for more information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <cluster-id>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
 ## Appendix
 
 ### Common cluster resource definitions
diff --git a/docs/deployment/resource-providers/standalone/kubernetes.zh.md 
b/docs/deployment/resource-providers/standalone/kubernetes.zh.md
index 70d4219..7706b1d 100644
--- a/docs/deployment/resource-providers/standalone/kubernetes.zh.md
+++ b/docs/deployment/resource-providers/standalone/kubernetes.zh.md
@@ -162,6 +162,35 @@ with the `kubectl` command:
     kubectl delete -f jobmanager-job.yaml
 ```
 
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high 
availability services]({% link deployment/ha/index.zh.md %}).
+
+### How to configure Kubernetes HA Services
+
+Session Mode, Per-Job Mode, and Application Mode clusters support using the 
Kubernetes high availability service. Users just need to add the following 
Flink config options to 
[flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All 
other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> The filesystem which corresponds to 
the scheme of your configured HA storage directory must be available to the 
runtime. Refer to [custom Flink image]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) 
and [enable plugins]({% link 
deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for 
more information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: flink-config
+  labels:
+    app: flink
+data:
+  flink-conf.yaml: |+
+  ...
+    kubernetes.cluster-id: <cluster-id>
+    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.storageDir: hdfs:///flink/recovery
+    restart-strategy: fixed-delay
+    restart-strategy.fixed-delay.attempts: 10
+  ...
+{% endhighlight %}
+
+
 ## Appendix
 
 ### Common cluster resource definitions

Reply via email to