XComp commented on a change in pull request #14254:
URL: https://github.com/apache/flink/pull/14254#discussion_r532482467
##########
File path: docs/deployment/ha/index.md
##########
@@ -24,308 +25,63 @@ specific language governing permissions and limitations
under the License.
-->
+JobManager High Availability (HA) hardens a Flink cluster against JobManager
failures.
+This feature ensures that a Flink cluster will always continue executing your
submitted jobs.
+
* Toc
{:toc}
-## Overview
+## JobManager High Availability
-The JobManager coordinates every Flink deployment. It is responsible for both
*scheduling* and *resource management*.
+The JobManager coordinates every Flink deployment.
+It is responsible for both *scheduling* and *resource management*.
-By default, there is a single JobManager instance per Flink cluster. This
creates a *single point of failure* (SPOF): if the JobManager crashes, no new
programs can be submitted and running programs fail.
+By default, there is a single JobManager instance per Flink cluster.
+This creates a *single point of failure* (SPOF): if the JobManager crashes, no
new programs can be submitted and running programs fail.
-With JobManager High Availability, you can recover from JobManager failures
and thereby eliminate the *SPOF*. You can configure high availability for both
**standalone**, **YARN clusters** and **Kubernetes clusters**.
+With JobManager High Availability, you can recover from JobManager failures
and thereby eliminate the *SPOF*.
+You can configure high availability for every cluster deployment.
+See [how to configure HA](#configuration) for more information.
-See more HA implementation details in [JobManager High
Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability)
in the Flink wiki.
+### How to make a cluster highly available
-## Standalone Cluster High Availability
-
-The general idea of JobManager high availability for standalone clusters is
that there is a **single leading JobManager** at any time and **multiple
standby JobManagers** to take over leadership in case the leader fails. This
guarantees that there is **no single point of failure** and programs can make
progress as soon as a standby JobManager has taken leadership. There is no
explicit distinction between standby and master JobManager instances. Each
JobManager can take the role of master or standby.
+The general idea of JobManager high availability is that there is a *single
leading JobManager* at any time and *multiple standby JobManagers* to take over
leadership in case the leader fails.
Review comment:
```suggestion
The general idea of JobManager High Availability is that there is a *single
leading JobManager* at any time and *multiple standby JobManagers* to take over
leadership in case the leader fails.
```
Just to be consistent with the subsection above.
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
+The `high-availability` option has to be set to `zookeeper`.
<pre>high-availability: zookeeper</pre>
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required):
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide
the distributed coordination service.
<pre>high-availability.zookeeper.quorum:
address1:2181[,...],addressX:2181</pre>
- Each *addressX:port* refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
+ Each `addressX:port` refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all
cluster nodes are placed.
+- **ZooKeeper root** (recommended):
+The *root ZooKeeper node*, under which all cluster nodes are placed.
- <pre>high-availability.zookeeper.path.root: /flink
+ <pre>high-availability.zookeeper.path.root: /flink</pre>
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*,
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended):
+The *cluster-id ZooKeeper node*, under which all required coordination data
for a cluster is placed.
<pre>high-availability.cluster-id: /default_ns # important: customize per
cluster</pre>
- **Important**: You should not set this value manually when running a YARN
- cluster, a per-job YARN session, or on another cluster manager. In those
- cases a cluster-id is automatically being generated based on the application
- id. Manually setting a cluster-id overrides this behaviour in YARN.
- Specifying a cluster-id with the -z CLI option, in turn, overrides manual
- configuration. If you are running multiple Flink HA clusters on bare metal,
- you have to manually configure separate cluster-ids for each cluster.
+ **Important**:
+ You should not set this value manually when running on YARN, native
Kubernetes or on another cluster manager.
+ In those cases a cluster-id is being automatically generated.
+ If you are running multiple Flink HA clusters on bare metal, you have to
manually configure separate cluster-ids for each cluster.
-- **Storage directory** (required): JobManager metadata is persisted in the
file system *storageDir* and only a pointer to this state is stored in
ZooKeeper.
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
`high-availability.storageDir` and only a pointer to this state is stored in
ZooKeeper.
- <pre>
-high-availability.storageDir: hdfs:///flink/recovery
- </pre>
-
- The `storageDir` stores all metadata needed to recover a JobManager
failure.
+ <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
-After configuring the masters and the ZooKeeper quorum, you can use the
provided cluster startup scripts as usual. They will start an HA-cluster. Keep
in mind that the **ZooKeeper quorum has to be running** when you call the
scripts and make sure to **configure a separate ZooKeeper root path** for each
HA cluster you are starting.
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
Review comment:
I would move the storage dir before `ZooKeeper cluster-id` and
`ZooKeeper root` considering that it's a required option as well. This reduces
the risk of users skipping over the recommended entries and missing out the
`Storage directory`.
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
Review comment:
What about linking the respective entries in the Configuration page
instead of coming up with a bold custom key for each item here? It would
highlight the actual item as well (through the link format) and enable us to
provide even more information in the configuration page which we might miss
here.
This comment applies to the other configuration items below as well.
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
+The `high-availability` option has to be set to `zookeeper`.
<pre>high-availability: zookeeper</pre>
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required):
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide
the distributed coordination service.
<pre>high-availability.zookeeper.quorum:
address1:2181[,...],addressX:2181</pre>
- Each *addressX:port* refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
+ Each `addressX:port` refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all
cluster nodes are placed.
+- **ZooKeeper root** (recommended):
+The *root ZooKeeper node*, under which all cluster nodes are placed.
- <pre>high-availability.zookeeper.path.root: /flink
+ <pre>high-availability.zookeeper.path.root: /flink</pre>
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*,
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended):
+The *cluster-id ZooKeeper node*, under which all required coordination data
for a cluster is placed.
<pre>high-availability.cluster-id: /default_ns # important: customize per
cluster</pre>
- **Important**: You should not set this value manually when running a YARN
- cluster, a per-job YARN session, or on another cluster manager. In those
- cases a cluster-id is automatically being generated based on the application
- id. Manually setting a cluster-id overrides this behaviour in YARN.
- Specifying a cluster-id with the -z CLI option, in turn, overrides manual
- configuration. If you are running multiple Flink HA clusters on bare metal,
- you have to manually configure separate cluster-ids for each cluster.
+ **Important**:
+ You should not set this value manually when running on YARN, native
Kubernetes or on another cluster manager.
+ In those cases a cluster-id is being automatically generated.
+ If you are running multiple Flink HA clusters on bare metal, you have to
manually configure separate cluster-ids for each cluster.
-- **Storage directory** (required): JobManager metadata is persisted in the
file system *storageDir* and only a pointer to this state is stored in
ZooKeeper.
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
`high-availability.storageDir` and only a pointer to this state is stored in
ZooKeeper.
- <pre>
-high-availability.storageDir: hdfs:///flink/recovery
- </pre>
-
- The `storageDir` stores all metadata needed to recover a JobManager
failure.
+ <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
-After configuring the masters and the ZooKeeper quorum, you can use the
provided cluster startup scripts as usual. They will start an HA-cluster. Keep
in mind that the **ZooKeeper quorum has to be running** when you call the
scripts and make sure to **configure a separate ZooKeeper root path** for each
HA cluster you are starting.
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
-1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in
`conf/flink-conf.yaml`:
- <pre>
+{% highlight bash %}
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the
following configurations in `flink-conf.yaml` as necessary:
- <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the
ZooKeeper quorum is configured
+ # with a different service name
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client # default is "Client". The value
needs to match one of the values
+ # configured in
"security.kerberos.login.contexts".
+{% endhighlight %}
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please
refer to the [security section of the Flink configuration page]({% link
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based
security internally]({% link deployment/security/security-kerberos.md %}).
- <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions
Review comment:
```suggestion
## ZooKeeper Versions
```
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
-So the following command will only shut down the Flink session cluster and
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+ <pre>high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
[`high-availability.storageDir`]({% link deployment/config.md
%}#high-availability-storagedir) and only a pointer to this state is stored in
Kubernetes.
Review comment:
```suggestion
JobManager metadata is persisted in the file system
[high-availability.storageDir]({% link deployment/config.md
%}#high-availability-storagedir) and only a pointer to this state is stored in
Kubernetes.
```
I didn't realize that the parameters were actually links due to the
formatting. We shouldn't overwrite the link formatting like that.
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
Review comment:
Here, again: What about using the parameter names as a key for each item
in this list (analogous to my proposal in the ZooKeeper HA page) and link the
respective entry on the Configuration page instead of coming up with a custom
item intro that's bold? Highlighting would work through the link formatting.
This applies to all items in this list.
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
-So the following command will only shut down the Flink session cluster and
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+ <pre>high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
[`high-availability.storageDir`]({% link deployment/config.md
%}#high-availability-storagedir) and only a pointer to this state is stored in
Kubernetes.
+
+ <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
+
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a
[`kubernetes.cluster-id`]({% link deployment/config.md
%}#kubernetes-cluster-id).
Review comment:
```suggestion
In order to identify the Flink cluster, you have to specify a
[kubernetes.cluster-id]({% link deployment/config.md %}#kubernetes-cluster-id).
```
I didn't realize that the parameters were actually links due to the
formatting. We shouldn't overwrite the link formatting like that.
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
-So the following command will only shut down the Flink session cluster and
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+ <pre>high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
[`high-availability.storageDir`]({% link deployment/config.md
%}#high-availability-storagedir) and only a pointer to this state is stored in
Kubernetes.
+
+ <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
+
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a
[`kubernetes.cluster-id`]({% link deployment/config.md
%}#kubernetes-cluster-id).
+
+ <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
-The following commands will cancel the job in application or session cluster
and effectively remove all its HA data.
{% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID>
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <ClusterId>
+high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+high-availability.storageDir: hdfs:///flink/recovery
{% endhighlight %}
+## High availability data clean up
+
To keep HA data while restarting the Flink cluster, simply delete the
deployment (via `kubectl delete deploy <ClusterID>`).
Review comment:
```suggestion
To keep HA data while restarting the Flink cluster, simply delete the
deployment (via `kubectl delete deploy <cluster-id>`).
```
`ClusterId` reformatting would apply here as well if we want to apply the
change.
##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
Make sure to call these scripts on the hosts on which you want to start/stop
the respective instance.
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
+
+ <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+ </pre>
+
+By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
Review comment:
```suggestion
1. **Configure high availability mode and ZooKeeper quorum** in
`${FLINK_HOME}/conf/flink-conf.yaml`:
```
What about the proposal of adding a `${*_HOME}` variable to paths to have a
clearer pointer to the actual file/script.
##########
File path: docs/deployment/resource-providers/standalone/kubernetes.md
##########
@@ -162,6 +162,34 @@ with the `kubectl` command:
kubectl delete -f jobmanager-job.yaml
```
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions).
All other yamls do not need to be updated.
Review comment:
```suggestion
Session Mode, Per-Job Mode, and Application Mode clusters support using the
Kubernetes high availability service. Users just need to add the following
Flink config options to
[flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All
other yamls do not need to be updated.
```
##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
Make sure to call these scripts on the hosts on which you want to start/stop
the respective instance.
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
+
+ <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+ </pre>
+
+By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
+
+### Example: Standalone Cluster with 2 JobManagers
+
+1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
+
+ <pre>
+high-availability: zookeeper
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /cluster_one # important: customize per cluster
+high-availability.storageDir: hdfs:///flink/recovery</pre>
+
+2. **Configure masters** in `conf/masters`:
+
+ <pre>
+localhost:8081
+localhost:8082</pre>
+
+3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only
possible to run a single ZooKeeper server per machine):
+
+ <pre>server.0=localhost:2888:3888</pre>
+
+4. **Start ZooKeeper quorum**:
+
+ <pre>
+$ bin/start-zookeeper-quorum.sh
+Starting zookeeper daemon on host localhost.</pre>
+
+5. **Start an HA-cluster**:
+
+ <pre>
+$ bin/start-cluster.sh
+Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum.
+Starting standalonesession daemon on host localhost.
+Starting standalonesession daemon on host localhost.
+Starting taskexecutor daemon on host localhost.</pre>
+
+6. **Stop ZooKeeper quorum and cluster**:
+
+ <pre>
+$ bin/stop-cluster.sh
+Stopping taskexecutor daemon (pid: 7647) on localhost.
+Stopping standalonesession daemon (pid: 7495) on host localhost.
+Stopping standalonesession daemon (pid: 7349) on host localhost.
+$ bin/stop-zookeeper-quorum.sh
+Stopping zookeeper daemon (pid: 7101) on host localhost.</pre>
Review comment:
There's also
```
{% highlight bash %}
# ...
{% endhighlight %}
```
which might be the more appropriate syntax to create code blocks.
##########
File path: docs/deployment/resource-providers/standalone/kubernetes.md
##########
@@ -162,6 +162,34 @@ with the `kubectl` command:
kubectl delete -f jobmanager-job.yaml
```
+## High-Availability with Standalone Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions).
All other yamls do not need to be updated.
+
+<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
+
+{% highlight yaml %}
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: flink-config
+ labels:
+ app: flink
+data:
+ flink-conf.yaml: |+
+ ...
+ kubernetes.cluster-id: <ClusterId>
Review comment:
```suggestion
kubernetes.cluster-id: <cluster-id>
```
`<ClusterId>` needs to be updated if we want to change the formatting
consistently.
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
+The `high-availability` option has to be set to `zookeeper`.
<pre>high-availability: zookeeper</pre>
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required):
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide
the distributed coordination service.
<pre>high-availability.zookeeper.quorum:
address1:2181[,...],addressX:2181</pre>
- Each *addressX:port* refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
+ Each `addressX:port` refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all
cluster nodes are placed.
+- **ZooKeeper root** (recommended):
+The *root ZooKeeper node*, under which all cluster nodes are placed.
- <pre>high-availability.zookeeper.path.root: /flink
+ <pre>high-availability.zookeeper.path.root: /flink</pre>
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*,
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended):
+The *cluster-id ZooKeeper node*, under which all required coordination data
for a cluster is placed.
<pre>high-availability.cluster-id: /default_ns # important: customize per
cluster</pre>
- **Important**: You should not set this value manually when running a YARN
- cluster, a per-job YARN session, or on another cluster manager. In those
- cases a cluster-id is automatically being generated based on the application
- id. Manually setting a cluster-id overrides this behaviour in YARN.
- Specifying a cluster-id with the -z CLI option, in turn, overrides manual
- configuration. If you are running multiple Flink HA clusters on bare metal,
- you have to manually configure separate cluster-ids for each cluster.
+ **Important**:
+ You should not set this value manually when running on YARN, native
Kubernetes or on another cluster manager.
+ In those cases a cluster-id is being automatically generated.
+ If you are running multiple Flink HA clusters on bare metal, you have to
manually configure separate cluster-ids for each cluster.
-- **Storage directory** (required): JobManager metadata is persisted in the
file system *storageDir* and only a pointer to this state is stored in
ZooKeeper.
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
`high-availability.storageDir` and only a pointer to this state is stored in
ZooKeeper.
- <pre>
-high-availability.storageDir: hdfs:///flink/recovery
- </pre>
-
- The `storageDir` stores all metadata needed to recover a JobManager
failure.
+ <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
-After configuring the masters and the ZooKeeper quorum, you can use the
provided cluster startup scripts as usual. They will start an HA-cluster. Keep
in mind that the **ZooKeeper quorum has to be running** when you call the
scripts and make sure to **configure a separate ZooKeeper root path** for each
HA cluster you are starting.
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
-1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in
`conf/flink-conf.yaml`:
- <pre>
+{% highlight bash %}
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
Review comment:
```suggestion
## Configuring for ZooKeeper Security
```
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
+The `high-availability` option has to be set to `zookeeper`.
<pre>high-availability: zookeeper</pre>
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required):
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide
the distributed coordination service.
<pre>high-availability.zookeeper.quorum:
address1:2181[,...],addressX:2181</pre>
- Each *addressX:port* refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
+ Each `addressX:port` refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all
cluster nodes are placed.
+- **ZooKeeper root** (recommended):
+The *root ZooKeeper node*, under which all cluster nodes are placed.
- <pre>high-availability.zookeeper.path.root: /flink
+ <pre>high-availability.zookeeper.path.root: /flink</pre>
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*,
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended):
+The *cluster-id ZooKeeper node*, under which all required coordination data
for a cluster is placed.
<pre>high-availability.cluster-id: /default_ns # important: customize per
cluster</pre>
- **Important**: You should not set this value manually when running a YARN
- cluster, a per-job YARN session, or on another cluster manager. In those
- cases a cluster-id is automatically being generated based on the application
- id. Manually setting a cluster-id overrides this behaviour in YARN.
- Specifying a cluster-id with the -z CLI option, in turn, overrides manual
- configuration. If you are running multiple Flink HA clusters on bare metal,
- you have to manually configure separate cluster-ids for each cluster.
+ **Important**:
+ You should not set this value manually when running on YARN, native
Kubernetes or on another cluster manager.
+ In those cases a cluster-id is being automatically generated.
+ If you are running multiple Flink HA clusters on bare metal, you have to
manually configure separate cluster-ids for each cluster.
-- **Storage directory** (required): JobManager metadata is persisted in the
file system *storageDir* and only a pointer to this state is stored in
ZooKeeper.
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
`high-availability.storageDir` and only a pointer to this state is stored in
ZooKeeper.
- <pre>
-high-availability.storageDir: hdfs:///flink/recovery
- </pre>
-
- The `storageDir` stores all metadata needed to recover a JobManager
failure.
+ <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
-After configuring the masters and the ZooKeeper quorum, you can use the
provided cluster startup scripts as usual. They will start an HA-cluster. Keep
in mind that the **ZooKeeper quorum has to be running** when you call the
scripts and make sure to **configure a separate ZooKeeper root path** for each
HA cluster you are starting.
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
-1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in
`conf/flink-conf.yaml`:
- <pre>
+{% highlight bash %}
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the
following configurations in `flink-conf.yaml` as necessary:
- <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the
ZooKeeper quorum is configured
+ # with a different service name
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client # default is "Client". The value
needs to match one of the values
+ # configured in
"security.kerberos.login.contexts".
+{% endhighlight %}
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please
refer to the [security section of the Flink configuration page]({% link
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based
security internally]({% link deployment/security/security-kerberos.md %}).
- <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions
-4. **Start ZooKeeper quorum**:
+Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in
the `lib` directory of the distribution
+and thus used by default, whereas 3.5 is placed in the `opt` directory.
- <pre>
-$ bin/start-zookeeper-quorum.sh
-Starting zookeeper daemon on host localhost.</pre>
+The 3.5 client allows you to secure the Zookeeper connection via SSL, but
_may_ not work with 3.4- Zookeeper installations.
Review comment:
```suggestion
The 3.5 client allows you to secure the ZooKeeper connection via SSL, but
_may_ not work with 3.4-ZooKeeper installations.
```
##########
File path: docs/deployment/ha/zookeeper_ha.md
##########
@@ -23,113 +23,104 @@ specific language governing permissions and limitations
under the License.
-->
-## ZooKeeper HA Services
+Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for
high availability services.
-One high availability services implementation uses ZooKeeper.
+* Toc
+{:toc}
-### Configuration
+Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances.
+ZooKeeper is a separate service from Flink, which provides highly reliable
distributed coordination via leader election and light-weight consistent state
storage.
+Check out [ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper.
+Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper)
installation.
-To enable JobManager High Availability you have to set the **high-availability
mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters
file** with all JobManagers hosts and their web UI ports.
+## Configuration
-Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed
coordination* between all running JobManager instances. ZooKeeper is a separate
service from Flink, which provides highly reliable distributed coordination via
leader election and light-weight consistent state storage. Check out
[ZooKeeper's Getting Started
Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more
information about ZooKeeper. Flink includes scripts to [bootstrap a simple
ZooKeeper](#bootstrap-zookeeper) installation.
+In order to start an HA-cluster you have to configure the following
configuration keys:
-#### Masters File (masters)
-
-In order to start an HA-cluster configure the *masters* file in `conf/masters`:
-
-- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
-
- <pre>
-jobManagerAddress1:webUIPort1
-[...]
-jobManagerAddressX:webUIPortX
- </pre>
-
-By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
-
-#### Config File (flink-conf.yaml)
-
-In order to start an HA-cluster add the following configuration keys to
`conf/flink-conf.yaml`:
-
-- **high-availability mode** (required): The *high-availability mode* has to
be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high
availability mode.
-Alternatively this option can be set to FQN of factory class Flink should use
to create HighAvailabilityServices instance.
+- **high-availability mode** (required):
+The `high-availability` option has to be set to `zookeeper`.
<pre>high-availability: zookeeper</pre>
-- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group
of ZooKeeper servers, which provide the distributed coordination service.
+- **ZooKeeper quorum** (required):
+A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide
the distributed coordination service.
<pre>high-availability.zookeeper.quorum:
address1:2181[,...],addressX:2181</pre>
- Each *addressX:port* refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
+ Each `addressX:port` refers to a ZooKeeper server, which is reachable by
Flink at the given address and port.
-- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all
cluster nodes are placed.
+- **ZooKeeper root** (recommended):
+The *root ZooKeeper node*, under which all cluster nodes are placed.
- <pre>high-availability.zookeeper.path.root: /flink
+ <pre>high-availability.zookeeper.path.root: /flink</pre>
-- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*,
under which all required coordination data for a cluster is placed.
+- **ZooKeeper cluster-id** (recommended):
+The *cluster-id ZooKeeper node*, under which all required coordination data
for a cluster is placed.
<pre>high-availability.cluster-id: /default_ns # important: customize per
cluster</pre>
- **Important**: You should not set this value manually when running a YARN
- cluster, a per-job YARN session, or on another cluster manager. In those
- cases a cluster-id is automatically being generated based on the application
- id. Manually setting a cluster-id overrides this behaviour in YARN.
- Specifying a cluster-id with the -z CLI option, in turn, overrides manual
- configuration. If you are running multiple Flink HA clusters on bare metal,
- you have to manually configure separate cluster-ids for each cluster.
+ **Important**:
+ You should not set this value manually when running on YARN, native
Kubernetes or on another cluster manager.
+ In those cases a cluster-id is being automatically generated.
+ If you are running multiple Flink HA clusters on bare metal, you have to
manually configure separate cluster-ids for each cluster.
-- **Storage directory** (required): JobManager metadata is persisted in the
file system *storageDir* and only a pointer to this state is stored in
ZooKeeper.
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
`high-availability.storageDir` and only a pointer to this state is stored in
ZooKeeper.
- <pre>
-high-availability.storageDir: hdfs:///flink/recovery
- </pre>
-
- The `storageDir` stores all metadata needed to recover a JobManager
failure.
+ <pre>high-availability.storageDir: hdfs:///flink/recovery</pre>
-After configuring the masters and the ZooKeeper quorum, you can use the
provided cluster startup scripts as usual. They will start an HA-cluster. Keep
in mind that the **ZooKeeper quorum has to be running** when you call the
scripts and make sure to **configure a separate ZooKeeper root path** for each
HA cluster you are starting.
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
-#### Example: Standalone Cluster with 2 JobManagers
+### Example configuration
-1. **Configure high availability mode and ZooKeeper quorum** in
`conf/flink-conf.yaml`:
+Configure high availability mode and ZooKeeper quorum in
`conf/flink-conf.yaml`:
- <pre>
+{% highlight bash %}
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
-high-availability.storageDir: hdfs:///flink/recovery</pre>
+high-availability.storageDir: hdfs:///flink/recovery
+{% endhighlight %}
+
+## Configuring for Zookeeper Security
-2. **Configure masters** in `conf/masters`:
+If ZooKeeper is running in secure mode with Kerberos, you can override the
following configurations in `flink-conf.yaml` as necessary:
- <pre>
-localhost:8081
-localhost:8082</pre>
+{% highlight bash %}
+zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the
ZooKeeper quorum is configured
+ # with a different service name
then it can be supplied here.
+zookeeper.sasl.login-context-name: Client # default is "Client". The value
needs to match one of the values
+ # configured in
"security.kerberos.login.contexts".
+{% endhighlight %}
-3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only
possible to run a single ZooKeeper server per machine):
+For more information on Flink configuration for Kerberos security, please
refer to the [security section of the Flink configuration page]({% link
deployment/config.md %}#security).
+You can also find further details on [how Flink sets up Kerberos-based
security internally]({% link deployment/security/security-kerberos.md %}).
- <pre>server.0=localhost:2888:3888</pre>
+## Zookeeper Versions
-4. **Start ZooKeeper quorum**:
+Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in
the `lib` directory of the distribution
Review comment:
```suggestion
Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being
in the `lib` directory of the distribution
```
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
-So the following command will only shut down the Flink session cluster and
leave all the HA related ConfigMaps, state untouched.
-{% highlight bash %}
-$ echo 'stop' | ./bin/kubernetes-session.sh
-Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true
-{% endhighlight %}
+ <pre>high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre>
+
+- **Storage directory** (required):
+JobManager metadata is persisted in the file system
[`high-availability.storageDir`]({% link deployment/config.md
%}#high-availability-storagedir) and only a pointer to this state is stored in
Kubernetes.
+
+ <pre>high-availability.storageDir: s3:///flink/recovery</pre>
+
+ The `storageDir` stores all metadata needed to recover a JobManager failure.
+
+- **Cluster id** (required):
+In order to identify the Flink cluster, you have to specify a
[`kubernetes.cluster-id`]({% link deployment/config.md
%}#kubernetes-cluster-id).
+
+ <pre>kubernetes.cluster-id: cluster1337</pre>
+
+### Example configuration
+
+Configure high availability mode in `conf/flink-conf.yaml`:
-The following commands will cancel the job in application or session cluster
and effectively remove all its HA data.
{% highlight bash %}
-# Cancel a Flink job in the existing session
-$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID>
<JobID>
-# Cancel a Flink application
-$ ./bin/flink cancel -t kubernetes-application
-Dkubernetes.cluster-id=<ClusterID> <JobID>
+kubernetes.cluster-id: <ClusterId>
Review comment:
```suggestion
kubernetes.cluster-id: <cluster-id>
```
nit: What about changing the formatting of `ClusterId` here to avoid
misguiding the user considering that only lowercased Strings are allowed.
##########
File path: docs/deployment/ha/kubernetes_ha.md
##########
@@ -23,77 +23,50 @@ specific language governing permissions and limitations
under the License.
-->
-## Kubernetes Cluster High Availability
-Kubernetes high availability service could support both [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
and [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %}).
+Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for
high availability services.
-When running Flink JobManager as a Kubernetes deployment, the replica count
should be configured to 1 or greater.
-* The value `1` means that a new JobManager will be launched to take over
leadership if the current one terminates exceptionally.
-* The value `N` (greater than 1) means that multiple JobManagers will be
launched simultaneously while one is active and others are standby. Starting
more than one JobManager will make the recovery faster.
+* Toc
+{:toc}
-### Configuration
-{% highlight yaml %}
-kubernetes.cluster-id: <ClusterId>
-high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-high-availability.storageDir: hdfs:///flink/recovery
-{% endhighlight %}
+Kubernetes high availability services can only be used when deploying to
Kubernetes.
+Consequently, they can be configured when using [standalone Flink on
Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %})
or the [native Kubernetes integration]({% link
deployment/resource-providers/native_kubernetes.md %})
-#### Example: Highly Available Standalone Flink Cluster on Kubernetes
-Both session and job/application clusters support using the Kubernetes high
availability service. Users just need to add the following Flink config options
to [flink-configuration-configmap.yaml]({% link
deployment/resource-providers/standalone/kubernetes.md
%}#common-cluster-resource-definitions). All other yamls do not need to be
updated.
-
-<span class="label label-info">Note</span> The filesystem which corresponds to
the scheme of your configured HA storage directory must be available to the
runtime. Refer to [custom Flink image]({% link
deployment/resource-providers/standalone/docker.md %}#customize-flink-image)
and [enable plugins]({% link deployment/resource-providers/standalone/docker.md
%}#using-plugins) for more information.
-
-{% highlight yaml %}
-apiVersion: v1
-kind: ConfigMap
-metadata:
- name: flink-config
- labels:
- app: flink
-data:
- flink-conf.yaml: |+
- ...
- kubernetes.cluster-id: <ClusterId>
- high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
- high-availability.storageDir: hdfs:///flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- ...
-{% endhighlight %}
+## Configuration
-#### Example: Highly Available Native Kubernetes Cluster
-Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
-{% highlight bash %}
-$ ./bin/flink run-application -p 8 -t kubernetes-application \
- -Dkubernetes.cluster-id=<ClusterId> \
- -Dtaskmanager.memory.process.size=4096m \
- -Dkubernetes.taskmanager.cpu=2 \
- -Dtaskmanager.numberOfTaskSlots=4 \
- -Dkubernetes.container.image=<CustomImageName> \
-
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\
- -Dhigh-availability.storageDir=s3://flink/flink-ha \
- -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
-
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar
\
- local:///opt/flink/examples/streaming/StateMachineExample.jar
-{% endhighlight %}
+In order to start an HA-cluster you have to configure the following
configuration keys:
-### High Availability Data Clean Up
-Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`,
`FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA
state on DFS, will be cleaned up.
+- **high-availability mode** (required):
+The [`high-availability`]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
Review comment:
```suggestion
The [high-availability]({% link deployment/config.md %}#high-availability-1)
option has to be set to `KubernetesHaServicesFactory`.
```
I didn't realize that the parameters were actually links due to the
formatting. We shouldn't overwrite the link formatting like that.
##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \
For more details see the [official Kubernetes
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high
availability services]({% link deployment/ha/index.md %}).
Review comment:
```suggestion
For high availability on Kubernetes, you can use the [existing high
availability services]({% link deployment/ha/kubernetes_ha.md %}).
```
Wouldn't it make sense to directly link to the actual HA page related to
Kubernetes here?
##########
File path: docs/deployment/resource-providers/native_kubernetes.md
##########
@@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \
For more details see the [official Kubernetes
documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables).
+## High-Availability with Native Kubernetes
+
+For high availability on Kubernetes, you can use the [existing high
availability services]({% link deployment/ha/index.md %}).
+
+### How to configure Kubernetes HA Services
+
+Using the following command to start a native Flink application cluster on
Kubernetes with high availability configured.
+{% highlight bash %}
+$ ./bin/flink run-application -p 8 -t kubernetes-application \
+ -Dkubernetes.cluster-id=<ClusterId> \
Review comment:
```suggestion
-Dkubernetes.cluster-id=<cluster-id> \
```
Here, my proposal of reformatting the cluster-id "variable" applies again.
##########
File path: docs/deployment/resource-providers/standalone/index.md
##########
@@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
Make sure to call these scripts on the hosts on which you want to start/stop
the respective instance.
+## High-Availability with Standalone
+
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper
HA services]({% link deployment/ha/zookeeper_ha.md %}).
+
+Additionally, you have to configure your cluster to start multiple JobManagers.
+
+### Masters File (masters)
+
+In order to start an HA-cluster configure the *masters* file in `conf/masters`:
+
+- **masters file**: The *masters file* contains all hosts, on which
JobManagers are started, and the ports to which the web user interface binds.
+
+ <pre>
+jobManagerAddress1:webUIPort1
+[...]
+jobManagerAddressX:webUIPortX
+ </pre>
+
+By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
**`high-availability.jobmanager.port`** key. This key accepts single ports
(e.g. `50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
Review comment:
```suggestion
By default, the job manager will pick a *random port* for inter process
communication. You can change this via the
[high-availability.jobmanager.port]({% link deployment/config.md
%}#high-availability-jobmanager-port) key. This key accepts single ports (e.g.
`50010`), ranges (`50000-50025`), or a combination of both
(`50010,50011,50020-50025,50050-50075`).
```
Removed formatting and added link.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]