This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93bffc063180002b4cba06d1ffc0aaa8045298e0 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Nov 27 18:49:18 2020 +0100 [FLINK-20357][docs] Split HA documentation up into a general overview and the specific implementations This commit splits the HA documentation up into a general overview and the specific implementations: * ZooKeeper HA services * Kubernetes HA services Moreover, this commit moves resource-provider specific documentation to the respective resource-provider documentation. This closes #14254. --- docs/deployment/ha/index.md | 320 +++------------------ docs/deployment/ha/index.zh.md | 320 +++------------------ docs/deployment/ha/kubernetes_ha.md | 99 +++---- docs/deployment/ha/kubernetes_ha.zh.md | 99 +++---- docs/deployment/ha/zookeeper_ha.md | 143 +++++---- docs/deployment/ha/zookeeper_ha.zh.md | 143 +++++---- .../resource-providers/native_kubernetes.md | 22 ++ .../resource-providers/native_kubernetes.zh.md | 22 ++ .../resource-providers/standalone/index.md | 67 +++++ .../resource-providers/standalone/index.zh.md | 68 +++++ .../resource-providers/standalone/kubernetes.md | 28 ++ .../resource-providers/standalone/kubernetes.zh.md | 29 ++ 12 files changed, 518 insertions(+), 842 deletions(-) diff --git a/docs/deployment/ha/index.md b/docs/deployment/ha/index.md index 8e417ce..1f0e321b 100644 --- a/docs/deployment/ha/index.md +++ b/docs/deployment/ha/index.md @@ -1,9 +1,10 @@ --- -title: "High Availability (HA)" -nav-id: ha +title: "JobManager High Availability (HA)" +nav-title: High Availability (HA) nav-parent_id: deployment -nav-pos: 6 +nav-id: ha nav-show_overview: true +nav-pos: 6 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -24,308 +25,57 @@ specific language governing permissions and limitations under the License. --> +JobManager High Availability (HA) hardens a Flink cluster against JobManager failures. +This feature ensures that a Flink cluster will always continue executing your submitted jobs. + * Toc {:toc} -## Overview - -The JobManager coordinates every Flink deployment. It is responsible for both *scheduling* and *resource management*. +## JobManager High Availability -By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. +The JobManager coordinates every Flink deployment. +It is responsible for both *scheduling* and *resource management*. -With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*. You can configure high availability for both **standalone**, **YARN clusters** and **Kubernetes clusters**. +By default, there is a single JobManager instance per Flink cluster. +This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. -See more HA implementation details in [JobManager High Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability) in the Flink wiki. +With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*. +You can configure high availability for every cluster deployment. +See the [list of available high availability services](#high-availability-services) for more information. -## Standalone Cluster High Availability +### How to make a cluster highly available -The general idea of JobManager high availability for standalone clusters is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master [...] +The general idea of JobManager High Availability is that there is a *single leading JobManager* at any time and *multiple standby JobManagers* to take over leadership in case the leader fails. +This guarantees that there is *no single point of failure* and programs can make progress as soon as a standby JobManager has taken leadership. As an example, consider the following setup with three JobManager instances: <img src="{% link /fig/jobmanager_ha_overview.png %}" class="center" /> -### Configuration - -To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. - -Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] - -#### Masters File (masters) - -In order to start an HA-cluster configure the *masters* file in `conf/masters`: - -- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. - - <pre> -jobManagerAddress1:webUIPort1 -[...] -jobManagerAddressX:webUIPortX - </pre> - -By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). - -#### Config File (flink-conf.yaml) - -In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: - -- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. -Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. - - <pre>high-availability: zookeeper</pre> - -- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. - - <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> - - Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. - -- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. - - <pre>high-availability.zookeeper.path.root: /flink - -- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. - - <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> - - **Important**: You should not set this value manually when running a YARN - cluster, a per-job YARN session, or on another cluster manager. In those - cases a cluster-id is automatically being generated based on the application - id. Manually setting a cluster-id overrides this behaviour in YARN. - Specifying a cluster-id with the -z CLI option, in turn, overrides manual - configuration. If you are running multiple Flink HA clusters on bare metal, - you have to manually configure separate cluster-ids for each cluster. - -- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. - - <pre> -high-availability.storageDir: hdfs:///flink/recovery - </pre> - - The `storageDir` stores all metadata needed to recover a JobManager failure. - -After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. - -#### Example: Standalone Cluster with 2 JobManagers - -1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: - - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.zookeeper.path.root: /flink -high-availability.cluster-id: /cluster_one # important: customize per cluster -high-availability.storageDir: hdfs:///flink/recovery</pre> - -2. **Configure masters** in `conf/masters`: - - <pre> -localhost:8081 -localhost:8082</pre> - -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): - - <pre>server.0=localhost:2888:3888</pre> - -4. **Start ZooKeeper quorum**: - - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> - -5. **Start an HA-cluster**: - - <pre> -$ bin/start-cluster.sh -Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. -Starting standalonesession daemon on host localhost. -Starting standalonesession daemon on host localhost. -Starting taskexecutor daemon on host localhost.</pre> - -6. **Stop ZooKeeper quorum and cluster**: - - <pre> -$ bin/stop-cluster.sh -Stopping taskexecutor daemon (pid: 7647) on localhost. -Stopping standalonesession daemon (pid: 7495) on host localhost. -Stopping standalonesession daemon (pid: 7349) on host localhost. -$ bin/stop-zookeeper-quorum.sh -Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> +Flink's [high availability services](#high-availability-services) encapsulate the required services to make everything work: +* **Leader election**: Selecting a single leader out of a pool of `n` candidates +* **Service discovery**: Retrieving the address of the current leader +* **State persistence**: Persisting state which is required for the successor to resume the job execution (JobGraphs, user code jars, completed checkpoints) -## YARN Cluster High Availability - -When running a highly available YARN cluster, **we don't run multiple JobManager (ApplicationMaster) instances**, but only one, which is restarted by YARN on failures. The exact behaviour depends on on the specific YARN version you are using. - -### Configuration - -#### Maximum Application Master Attempts (yarn-site.xml) - -You have to configure the maximum number of attempts for the application masters for **your** YARN setup in `yarn-site.xml`: - -{% highlight xml %} -<property> - <name>yarn.resourcemanager.am.max-attempts</name> - <value>4</value> - <description> - The maximum number of application master execution attempts. - </description> -</property> -{% endhighlight %} - -The default for current YARN versions is 2 (meaning a single JobManager failure is tolerated). - -#### Application Attempts (flink-conf.yaml) - -In addition to the HA configuration ([see above](#configuration)), you have to configure the maximum attempts in `conf/flink-conf.yaml`: - -<pre>yarn.application-attempts: 10</pre> - -This means that the application can be restarted 9 times for failed attempts before YARN fails the application (9 retries + 1 initial attempt). Additional restarts can be performed by YARN if required by YARN operations: Preemption, node hardware failures or reboots, or NodeManager resyncs. These restarts are not counted against `yarn.application-attempts`, see <a href="http://johnjianfang.blogspot.de/2015/04/the-number-of-maximum-attempts-of-yarn.html">Jian Fang's blog post</a>. It's im [...] - -#### Container Shutdown Behaviour - -- **YARN 2.3.0 < version < 2.4.0**. All containers are restarted if the application master fails. -- **YARN 2.4.0 < version < 2.6.0**. TaskManager containers are kept alive across application master failures. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again. -- **YARN 2.6.0 <= version**: Sets the attempt failure validity interval to the Flinks' Akka timeout value. The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it's application attempts. - -<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container restarts from a restarted Application Master/Job Manager container. See <a href="https://issues.apache.org/jira/browse/FLINK-4142">FLINK-4142</a> for details. We recommend using at least Hadoop 2.5.0 for high availability setups on YARN.</p> - -#### Example: Highly Available YARN Session - -1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: - - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.storageDir: hdfs:///flink/recovery -high-availability.zookeeper.path.root: /flink -yarn.application-attempts: 10</pre> - -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): - - <pre>server.0=localhost:2888:3888</pre> - -4. **Start ZooKeeper quorum**: - - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> - -5. **Start an HA-cluster**: - - <pre> -$ bin/yarn-session.sh -n 2</pre> - -## Kubernetes Cluster High Availability -Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}). - -When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. -* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. -* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. - -### Configuration -{% highlight yaml %} -kubernetes.cluster-id: <ClusterId> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -high-availability.storageDir: hdfs:///flink/recovery -{% endhighlight %} - -#### Example: Highly Available Standalone Flink Cluster on Kubernetes -Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. - -<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information. - -{% highlight yaml %} -apiVersion: v1 -kind: ConfigMap -metadata: - name: flink-config - labels: - app: flink -data: - flink-conf.yaml: |+ - ... - kubernetes.cluster-id: <ClusterId> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory - high-availability.storageDir: hdfs:///flink/recovery - restart-strategy: fixed-delay - restart-strategy.fixed-delay.attempts: 10 - ... -{% endhighlight %} - -#### Example: Highly Available Native Kubernetes Cluster -Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. -{% highlight bash %} -$ ./bin/flink run-application -p 8 -t kubernetes-application \ - -Dkubernetes.cluster-id=<ClusterId> \ - -Dtaskmanager.memory.process.size=4096m \ - -Dkubernetes.taskmanager.cpu=2 \ - -Dtaskmanager.numberOfTaskSlots=4 \ - -Dkubernetes.container.image=<CustomImageName> \ - -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ - -Dhigh-availability.storageDir=s3://flink/flink-ha \ - -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ - -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - local:///opt/flink/examples/streaming/StateMachineExample.jar -{% endhighlight %} - -### High Availability Data Clean Up -Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. - -So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. -{% highlight bash %} -$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true -{% endhighlight %} - -The following commands will cancel the job in application or session cluster and effectively remove all its HA data. -{% highlight bash %} -# Cancel a Flink job in the existing session -$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> -# Cancel a Flink application -$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> -{% endhighlight %} - -To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). -All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). -HA related ConfigMaps will be retained because they do not set the owner reference. -When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. - -## Configuring for Zookeeper Security - -If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary: - -<pre> -zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the ZooKeeper quorum is configured - # with a different service name then it can be supplied here. -zookeeper.sasl.login-context-name: Client # default is "Client". The value needs to match one of the values - # configured in "security.kerberos.login.contexts". -</pre> - -For more information on Flink configuration for Kerberos security, please see [here]({% link deployment/config.md %}). -You can also find [here]({% link deployment/security/security-kerberos.md %}) further details on how Flink internally setups Kerberos-based security. - -## Zookeeper Versions - -Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in the `lib` directory of the distribution -and thus used by default, whereas 3.5 is placed in the `opt` directory. +{% top %} -The 3.5 client allows you to secure the Zookeeper connection via SSL, but _may_ not work with 3.4- Zookeeper installations. +## High Availability Services -You can control which version is used by Flink by placing either jar in the `lib` directory. +Flink ships with two high availability service implementations: -## Bootstrap ZooKeeper +* [ZooKeeper]({% link deployment/ha/zookeeper_ha.md %}): +ZooKeeper HA services can be used with every Flink cluster deployment. +They require a running ZooKeeper quorum. -If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. +* [Kubernetes]({% link deployment/ha/kubernetes_ha.md %}): +Kubernetes HA services only work when running on Kubernetes. -There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: +{% top %} -<pre> -server.X=addressX:peerPort:leaderPort -[...] -server.Y=addressY:peerPort:leaderPort -</pre> +## High Availability data lifecycle -The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation. +In order to recover submitted jobs, Flink persists metadata and the job artifacts. +The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. +Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted. {% top %} diff --git a/docs/deployment/ha/index.zh.md b/docs/deployment/ha/index.zh.md index 908ef84..5133012 100644 --- a/docs/deployment/ha/index.zh.md +++ b/docs/deployment/ha/index.zh.md @@ -1,9 +1,10 @@ --- -title: "High Availability (HA)" -nav-id: ha +title: "JobManager High Availability (HA)" +nav-title: High Availability (HA) nav-parent_id: deployment -nav-pos: 6 +nav-id: ha nav-show_overview: true +nav-pos: 6 --- <!-- Licensed to the Apache Software Foundation (ASF) under one @@ -24,308 +25,53 @@ specific language governing permissions and limitations under the License. --> +JobManager High Availability (HA) hardens a Flink cluster against JobManager failures. +This feature ensures that a Flink cluster will always continue executing your submitted jobs. + * Toc {:toc} -## Overview - -The JobManager coordinates every Flink deployment. It is responsible for both *scheduling* and *resource management*. +## JobManager High Availability -By default, there is a single JobManager instance per Flink cluster. This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. +The JobManager coordinates every Flink deployment. +It is responsible for both *scheduling* and *resource management*. -With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*. You can configure high availability for both **standalone**, **YARN clusters** and **Kubernetes clusters**. +By default, there is a single JobManager instance per Flink cluster. +This creates a *single point of failure* (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail. -See more HA implementation details in [JobManager High Availability](https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability) in the Flink wiki. +With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the *SPOF*. +You can configure high availability for every cluster deployment. +See the [list of available high availability services](#high-availability-services) for more information. -## Standalone Cluster High Availability +### How to make a cluster highly available -The general idea of JobManager high availability for standalone clusters is that there is a **single leading JobManager** at any time and **multiple standby JobManagers** to take over leadership in case the leader fails. This guarantees that there is **no single point of failure** and programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master [...] +The general idea of JobManager High Availability is that there is a *single leading JobManager* at any time and *multiple standby JobManagers* to take over leadership in case the leader fails. +This guarantees that there is *no single point of failure* and programs can make progress as soon as a standby JobManager has taken leadership. As an example, consider the following setup with three JobManager instances: <img src="{% link /fig/jobmanager_ha_overview.png %}" class="center" /> -### Configuration - -To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. - -Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] - -#### Masters File (masters) - -In order to start an HA-cluster configure the *masters* file in `conf/masters`: - -- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. - - <pre> -jobManagerAddress1:webUIPort1 -[...] -jobManagerAddressX:webUIPortX - </pre> - -By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). - -#### Config File (flink-conf.yaml) - -In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: - -- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. -Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. - - <pre>high-availability: zookeeper</pre> - -- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. - - <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> - - Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. - -- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. - - <pre>high-availability.zookeeper.path.root: /flink - -- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. - - <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> - - **Important**: You should not set this value manually when running a YARN - cluster, a per-job YARN session, or on another cluster manager. In those - cases a cluster-id is automatically being generated based on the application - id. Manually setting a cluster-id overrides this behaviour in YARN. - Specifying a cluster-id with the -z CLI option, in turn, overrides manual - configuration. If you are running multiple Flink HA clusters on bare metal, - you have to manually configure separate cluster-ids for each cluster. - -- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. - - <pre> -high-availability.storageDir: hdfs:///flink/recovery - </pre> - - The `storageDir` stores all metadata needed to recover a JobManager failure. - -After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. - -#### Example: Standalone Cluster with 2 JobManagers - -1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: - - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.zookeeper.path.root: /flink -high-availability.cluster-id: /cluster_one # important: customize per cluster -high-availability.storageDir: hdfs:///flink/recovery</pre> - -2. **Configure masters** in `conf/masters`: - - <pre> -localhost:8081 -localhost:8082</pre> - -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): - - <pre>server.0=localhost:2888:3888</pre> - -4. **Start ZooKeeper quorum**: - - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> - -5. **Start an HA-cluster**: - - <pre> -$ bin/start-cluster.sh -Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. -Starting standalonesession daemon on host localhost. -Starting standalonesession daemon on host localhost. -Starting taskexecutor daemon on host localhost.</pre> - -6. **Stop ZooKeeper quorum and cluster**: - - <pre> -$ bin/stop-cluster.sh -Stopping taskexecutor daemon (pid: 7647) on localhost. -Stopping standalonesession daemon (pid: 7495) on host localhost. -Stopping standalonesession daemon (pid: 7349) on host localhost. -$ bin/stop-zookeeper-quorum.sh -Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> - -## YARN Cluster High Availability - -When running a highly available YARN cluster, **we don't run multiple JobManager (ApplicationMaster) instances**, but only one, which is restarted by YARN on failures. The exact behaviour depends on on the specific YARN version you are using. - -### Configuration - -#### Maximum Application Master Attempts (yarn-site.xml) - -You have to configure the maximum number of attempts for the application masters for **your** YARN setup in `yarn-site.xml`: - -{% highlight xml %} -<property> - <name>yarn.resourcemanager.am.max-attempts</name> - <value>4</value> - <description> - The maximum number of application master execution attempts. - </description> -</property> -{% endhighlight %} - -The default for current YARN versions is 2 (meaning a single JobManager failure is tolerated). - -#### Application Attempts (flink-conf.yaml) - -In addition to the HA configuration ([see above](#configuration)), you have to configure the maximum attempts in `conf/flink-conf.yaml`: - -<pre>yarn.application-attempts: 10</pre> - -This means that the application can be restarted 9 times for failed attempts before YARN fails the application (9 retries + 1 initial attempt). Additional restarts can be performed by YARN if required by YARN operations: Preemption, node hardware failures or reboots, or NodeManager resyncs. These restarts are not counted against `yarn.application-attempts`, see <a href="http://johnjianfang.blogspot.de/2015/04/the-number-of-maximum-attempts-of-yarn.html">Jian Fang's blog post</a>. It's im [...] - -#### Container Shutdown Behaviour - -- **YARN 2.3.0 < version < 2.4.0**. All containers are restarted if the application master fails. -- **YARN 2.4.0 < version < 2.6.0**. TaskManager containers are kept alive across application master failures. This has the advantage that the startup time is faster and that the user does not have to wait for obtaining the container resources again. -- **YARN 2.6.0 <= version**: Sets the attempt failure validity interval to the Flinks' Akka timeout value. The attempt failure validity interval says that an application is only killed after the system has seen the maximum number of application attempts during one interval. This avoids that a long lasting job will deplete it's application attempts. - -<p style="border-radius: 5px; padding: 5px" class="bg-danger"><b>Note</b>: Hadoop YARN 2.4.0 has a major bug (fixed in 2.5.0) preventing container restarts from a restarted Application Master/Job Manager container. See <a href="https://issues.apache.org/jira/browse/FLINK-4142">FLINK-4142</a> for details. We recommend using at least Hadoop 2.5.0 for high availability setups on YARN.</p> - -#### Example: Highly Available YARN Session - -1. **Configure HA mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: - - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.storageDir: hdfs:///flink/recovery -high-availability.zookeeper.path.root: /flink -yarn.application-attempts: 10</pre> - -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): - - <pre>server.0=localhost:2888:3888</pre> - -4. **Start ZooKeeper quorum**: - - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> - -5. **Start an HA-cluster**: - - <pre> -$ bin/yarn-session.sh -n 2</pre> - -## Kubernetes Cluster High Availability -Kubernetes high availability service could support both [standalone Flink on Kubernetes]({%link deployment/resource-providers/standalone/kubernetes.zh.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.zh.md %}). - -When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. -* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. -* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. - -### Configuration -{% highlight yaml %} -kubernetes.cluster-id: <ClusterId> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -high-availability.storageDir: hdfs:///flink/recovery -{% endhighlight %} - -#### Example: Highly Available Standalone Flink Cluster on Kubernetes -Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. - -<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for more information. - -{% highlight yaml %} -apiVersion: v1 -kind: ConfigMap -metadata: - name: flink-config - labels: - app: flink -data: - flink-conf.yaml: |+ - ... - kubernetes.cluster-id: <ClusterId> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory - high-availability.storageDir: hdfs:///flink/recovery - restart-strategy: fixed-delay - restart-strategy.fixed-delay.attempts: 10 - ... -{% endhighlight %} - -#### Example: Highly Available Native Kubernetes Cluster -Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. -{% highlight bash %} -$ ./bin/flink run-application -p 8 -t kubernetes-application \ - -Dkubernetes.cluster-id=<ClusterId> \ - -Dtaskmanager.memory.process.size=4096m \ - -Dkubernetes.taskmanager.cpu=2 \ - -Dtaskmanager.numberOfTaskSlots=4 \ - -Dkubernetes.container.image=<CustomImageName> \ - -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ - -Dhigh-availability.storageDir=s3://flink/flink-ha \ - -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ - -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - local:///opt/flink/examples/streaming/StateMachineExample.jar -{% endhighlight %} - -### High Availability Data Clean Up -Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. - -So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. -{% highlight bash %} -$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true -{% endhighlight %} - -The following commands will cancel the job in application or session cluster and effectively remove all its HA data. -{% highlight bash %} -# Cancel a Flink job in the existing session -$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> -# Cancel a Flink application -$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> -{% endhighlight %} - -To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). -All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). -HA related ConfigMaps will be retained because they do not set the owner reference. -When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. - -## Configuring for Zookeeper Security - -If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary: - -<pre> -zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the ZooKeeper quorum is configured - # with a different service name then it can be supplied here. -zookeeper.sasl.login-context-name: Client # default is "Client". The value needs to match one of the values - # configured in "security.kerberos.login.contexts". -</pre> - -For more information on Flink configuration for Kerberos security, please see [here]({% link deployment/config.zh.md %}). -You can also find [here]({% link deployment/security/security-kerberos.zh.md %}) further details on how Flink internally setups Kerberos-based security. - -## Zookeeper Versions - -Flink ships with separate Zookeeper clients for 3.4 and 3.5, with 3.4 being in the `lib` directory of the distribution -and thus used by default, whereas 3.5 is placed in the `opt` directory. - -The 3.5 client allows you to secure the Zookeeper connection via SSL, but _may_ not work with 3.4- Zookeeper installations. +Flink's [high availability services](#high-availability-services) encapsulate the required services to make everything work: +* **Leader election**: Selecting a single leader out of a pool of `n` candidates +* **Service discovery**: Retrieving the address of the current leader +* **State persistence**: Persisting state which is required for the successor to resume the job execution (JobGraphs, user code jars, completed checkpoints) -You can control which version is used by Flink by placing either jar in the `lib` directory. +## High Availability Services -## Bootstrap ZooKeeper +Flink ships with two high availability service implementations: -If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. +* [ZooKeeper]({% link deployment/ha/zookeeper_ha.zh.md %}): +ZooKeeper HA services can be used with every Flink cluster deployment. +They require a running ZooKeeper quorum. -There is a ZooKeeper configuration template in `conf/zoo.cfg`. You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: +* [Kubernetes]({% link deployment/ha/kubernetes_ha.zh.md %}): +Kubernetes HA services only work when running on Kubernetes. -<pre> -server.X=addressX:peerPort:leaderPort -[...] -server.Y=addressY:peerPort:leaderPort -</pre> +## High Availability data lifecycle -The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation. +In order to recover submitted jobs, Flink persists metadata and the job artifacts. +The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. +Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted. {% top %} diff --git a/docs/deployment/ha/kubernetes_ha.md b/docs/deployment/ha/kubernetes_ha.md index f2226ce..c553a1a 100644 --- a/docs/deployment/ha/kubernetes_ha.md +++ b/docs/deployment/ha/kubernetes_ha.md @@ -23,77 +23,52 @@ specific language governing permissions and limitations under the License. --> -## Kubernetes Cluster High Availability -Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}). +Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for high availability services. -When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. -* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. -* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. +* Toc +{:toc} -### Configuration -{% highlight yaml %} -kubernetes.cluster-id: <ClusterId> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -high-availability.storageDir: hdfs:///flink/recovery -{% endhighlight %} +Kubernetes high availability services can only be used when deploying to Kubernetes. +Consequently, they can be configured when using [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.md %}) or the [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.md %}) -#### Example: Highly Available Standalone Flink Cluster on Kubernetes -Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. - -<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information. - -{% highlight yaml %} -apiVersion: v1 -kind: ConfigMap -metadata: - name: flink-config - labels: - app: flink -data: - flink-conf.yaml: |+ - ... - kubernetes.cluster-id: <ClusterId> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory - high-availability.storageDir: hdfs:///flink/recovery - restart-strategy: fixed-delay - restart-strategy.fixed-delay.attempts: 10 - ... -{% endhighlight %} +## Configuration -#### Example: Highly Available Native Kubernetes Cluster -Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. -{% highlight bash %} -$ ./bin/flink run-application -p 8 -t kubernetes-application \ - -Dkubernetes.cluster-id=<ClusterId> \ - -Dtaskmanager.memory.process.size=4096m \ - -Dkubernetes.taskmanager.cpu=2 \ - -Dtaskmanager.numberOfTaskSlots=4 \ - -Dkubernetes.container.image=<CustomImageName> \ - -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ - -Dhigh-availability.storageDir=s3://flink/flink-ha \ - -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ - -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - local:///opt/flink/examples/streaming/StateMachineExample.jar -{% endhighlight %} +In order to start an HA-cluster you have to configure the following configuration keys: -### High Availability Data Clean Up -Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. +- [high-availability]({% link deployment/config.md %}#high-availability-1) (required): +The `high-availability` option has to be set to `KubernetesHaServicesFactory`. -So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. -{% highlight bash %} -$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true -{% endhighlight %} + <pre>high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre> + +- [high-availability.storageDir]({% link deployment/config.md %}#high-availability-storagedir) (required): +JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in Kubernetes. + + <pre>high-availability.storageDir: s3:///flink/recovery</pre> + + The `storageDir` stores all metadata needed to recover a JobManager failure. + +- [kubernetes.cluster-id]({% link deployment/config.md %}#kubernetes-cluster-id) (required): +In order to identify the Flink cluster, you have to specify a `kubernetes.cluster-id`. + + <pre>kubernetes.cluster-id: cluster1337</pre> + +### Example configuration + +Configure high availability mode in `conf/flink-conf.yaml`: -The following commands will cancel the job in application or session cluster and effectively remove all its HA data. {% highlight bash %} -# Cancel a Flink job in the existing session -$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> -# Cancel a Flink application -$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> +kubernetes.cluster-id: <cluster-id> +high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability.storageDir: hdfs:///flink/recovery {% endhighlight %} -To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). +{% top %} + +## High availability data clean up + +To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <cluster-id>`). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. -When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. +When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint. + +{% top %} diff --git a/docs/deployment/ha/kubernetes_ha.zh.md b/docs/deployment/ha/kubernetes_ha.zh.md index 2618db6..4ff8356 100644 --- a/docs/deployment/ha/kubernetes_ha.zh.md +++ b/docs/deployment/ha/kubernetes_ha.zh.md @@ -23,77 +23,52 @@ specific language governing permissions and limitations under the License. --> -## Kubernetes Cluster High Availability -Kubernetes high availability service could support both [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}) and [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.zh.md %}). +Flink's Kubernetes HA services use [Kubernetes](https://kubernetes.io/) for high availability services. -When running Flink JobManager as a Kubernetes deployment, the replica count should be configured to 1 or greater. -* The value `1` means that a new JobManager will be launched to take over leadership if the current one terminates exceptionally. -* The value `N` (greater than 1) means that multiple JobManagers will be launched simultaneously while one is active and others are standby. Starting more than one JobManager will make the recovery faster. +* Toc +{:toc} -### Configuration -{% highlight yaml %} -kubernetes.cluster-id: <ClusterId> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -high-availability.storageDir: hdfs:///flink/recovery -{% endhighlight %} +Kubernetes high availability services can only be used when deploying to Kubernetes. +Consequently, they can be configured when using [standalone Flink on Kubernetes]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}) or the [native Kubernetes integration]({% link deployment/resource-providers/native_kubernetes.zh.md %}) -#### Example: Highly Available Standalone Flink Cluster on Kubernetes -Both session and job/application clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml]({% link deployment/resource-providers/standalone/kubernetes.zh.md %}#common-cluster-resource-definitions). All other yamls do not need to be updated. - -<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for more information. - -{% highlight yaml %} -apiVersion: v1 -kind: ConfigMap -metadata: - name: flink-config - labels: - app: flink -data: - flink-conf.yaml: |+ - ... - kubernetes.cluster-id: <ClusterId> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory - high-availability.storageDir: hdfs:///flink/recovery - restart-strategy: fixed-delay - restart-strategy.fixed-delay.attempts: 10 - ... -{% endhighlight %} +## Configuration -#### Example: Highly Available Native Kubernetes Cluster -Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. -{% highlight bash %} -$ ./bin/flink run-application -p 8 -t kubernetes-application \ - -Dkubernetes.cluster-id=<ClusterId> \ - -Dtaskmanager.memory.process.size=4096m \ - -Dkubernetes.taskmanager.cpu=2 \ - -Dtaskmanager.numberOfTaskSlots=4 \ - -Dkubernetes.container.image=<CustomImageName> \ - -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ - -Dhigh-availability.storageDir=s3://flink/flink-ha \ - -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ - -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ - local:///opt/flink/examples/streaming/StateMachineExample.jar -{% endhighlight %} +In order to start an HA-cluster you have to configure the following configuration keys: -### High Availability Data Clean Up -Currently, when a Flink job reached the terminal state (`FAILED`, `CANCELED`, `FINISHED`), all the HA data, including metadata in Kubernetes ConfigMap and HA state on DFS, will be cleaned up. +- [high-availability]({% link deployment/config.zh.md %}#high-availability-1) (required): +The `high-availability` option has to be set to `KubernetesHaServicesFactory`. -So the following command will only shut down the Flink session cluster and leave all the HA related ConfigMaps, state untouched. -{% highlight bash %} -$ echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=<ClusterId> -Dexecution.attached=true -{% endhighlight %} + <pre>high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory</pre> + +- [high-availability.storageDir]({% link deployment/config.zh.md %}#high-availability-storagedir) (required): +JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in Kubernetes. + + <pre>high-availability.storageDir: s3:///flink/recovery</pre> + + The `storageDir` stores all metadata needed to recover a JobManager failure. + +- [kubernetes.cluster-id]({% link deployment/config.zh.md %}#kubernetes-cluster-id) (required): +In order to identify the Flink cluster, you have to specify a `kubernetes.cluster-id`. + + <pre>kubernetes.cluster-id: cluster1337</pre> + +### Example configuration + +Configure high availability mode in `conf/flink-conf.yaml`: -The following commands will cancel the job in application or session cluster and effectively remove all its HA data. {% highlight bash %} -# Cancel a Flink job in the existing session -$ ./bin/flink cancel -t kubernetes-session -Dkubernetes.cluster-id=<ClusterID> <JobID> -# Cancel a Flink application -$ ./bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=<ClusterID> <JobID> +kubernetes.cluster-id: <cluster-id> +high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability.storageDir: hdfs:///flink/recovery {% endhighlight %} -To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <ClusterID>`). +{% top %} + +## High availability data clean up + +To keep HA data while restarting the Flink cluster, simply delete the deployment (via `kubectl delete deploy <cluster-id>`). All the Flink cluster related resources will be deleted (e.g. JobManager Deployment, TaskManager pods, services, Flink conf ConfigMap). HA related ConfigMaps will be retained because they do not set the owner reference. -When restarting the session / application using `kubernetes-session.sh` or `flink run-application`, all previously running jobs will be recovered and restarted from the latest successful checkpoint. +When restarting the cluster, all previously running jobs will be recovered and restarted from the latest successful checkpoint. + +{% top %} diff --git a/docs/deployment/ha/zookeeper_ha.md b/docs/deployment/ha/zookeeper_ha.md index d1faf2d..5371d81 100644 --- a/docs/deployment/ha/zookeeper_ha.md +++ b/docs/deployment/ha/zookeeper_ha.md @@ -23,113 +23,110 @@ specific language governing permissions and limitations under the License. --> -## ZooKeeper HA Services +Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for high availability services. -One high availability services implementation uses ZooKeeper. +* Toc +{:toc} -### Configuration +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. +ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. +Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. +Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation. -To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. +## Configuration -Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] +In order to start an HA-cluster you have to configure the following configuration keys: -#### Masters File (masters) +- [high-availability]({% link deployment/config.md %}#high-availability-1) (required): +The `high-availability` option has to be set to `zookeeper`. -In order to start an HA-cluster configure the *masters* file in `conf/masters`: - -- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. - - <pre> -jobManagerAddress1:webUIPort1 -[...] -jobManagerAddressX:webUIPortX - </pre> - -By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). - -#### Config File (flink-conf.yaml) + <pre>high-availability: zookeeper</pre> -In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: +- [high-availability.storageDir]({% link deployment/config.md %}#high-availability-storagedir) (required): +JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in ZooKeeper. -- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. -Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. + <pre>high-availability.storageDir: hdfs:///flink/recovery</pre> - <pre>high-availability: zookeeper</pre> + The `storageDir` stores all metadata needed to recover a JobManager failure. -- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. +- [high-availability.zookeeper.quorum]({%link deployment/config.md %}#high-availability-zookeeper-quorum) (required): +A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> - Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + Each `addressX:port` refers to a ZooKeeper server, which is reachable by Flink at the given address and port. -- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. +- [high-availability.zookeeper.path.root]({% link deployment/config.md %}#high-availability-zookeeper-path-root) (recommended): +The *root ZooKeeper node*, under which all cluster nodes are placed. - <pre>high-availability.zookeeper.path.root: /flink + <pre>high-availability.zookeeper.path.root: /flink</pre> -- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. +- [high-availability.cluster-id]({% link deployment/config.md %}#high-availability-cluster-id) (recommended): +The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> - **Important**: You should not set this value manually when running a YARN - cluster, a per-job YARN session, or on another cluster manager. In those - cases a cluster-id is automatically being generated based on the application - id. Manually setting a cluster-id overrides this behaviour in YARN. - Specifying a cluster-id with the -z CLI option, in turn, overrides manual - configuration. If you are running multiple Flink HA clusters on bare metal, - you have to manually configure separate cluster-ids for each cluster. + **Important**: + You should not set this value manually when running on YARN, native Kubernetes or on another cluster manager. + In those cases a cluster-id is being automatically generated. + If you are running multiple Flink HA clusters on bare metal, you have to manually configure separate cluster-ids for each cluster. + +### Example configuration -- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. +Configure high availability mode and ZooKeeper quorum in `conf/flink-conf.yaml`: - <pre> +{% highlight bash %} +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: hdfs:///flink/recovery - </pre> +{% endhighlight %} - The `storageDir` stores all metadata needed to recover a JobManager failure. +{% top %} -After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. +## Configuring for ZooKeeper Security -#### Example: Standalone Cluster with 2 JobManagers +If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary: -1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: +{% highlight bash %} +zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the ZooKeeper quorum is configured + # with a different service name then it can be supplied here. +zookeeper.sasl.login-context-name: Client # default is "Client". The value needs to match one of the values + # configured in "security.kerberos.login.contexts". +{% endhighlight %} - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.zookeeper.path.root: /flink -high-availability.cluster-id: /cluster_one # important: customize per cluster -high-availability.storageDir: hdfs:///flink/recovery</pre> +For more information on Flink configuration for Kerberos security, please refer to the [security section of the Flink configuration page]({% link deployment/config.md %}#security). +You can also find further details on [how Flink sets up Kerberos-based security internally]({% link deployment/security/security-kerberos.md %}). + +{% top %} -2. **Configure masters** in `conf/masters`: +## ZooKeeper Versions - <pre> -localhost:8081 -localhost:8082</pre> +Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being in the `lib` directory of the distribution +and thus used by default, whereas 3.5 is placed in the `opt` directory. -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): +The 3.5 client allows you to secure the ZooKeeper connection via SSL, but _may_ not work with 3.4- ZooKeeper installations. - <pre>server.0=localhost:2888:3888</pre> +You can control which version is used by Flink by placing either jar in the `lib` directory. -4. **Start ZooKeeper quorum**: +{% top %} - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> +## Bootstrap ZooKeeper -5. **Start an HA-cluster**: +If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. - <pre> -$ bin/start-cluster.sh -Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. -Starting standalonesession daemon on host localhost. -Starting standalonesession daemon on host localhost. -Starting taskexecutor daemon on host localhost.</pre> +There is a ZooKeeper configuration template in `conf/zoo.cfg`. +You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: + +{% highlight bash %} +server.X=addressX:peerPort:leaderPort +[...] +server.Y=addressY:peerPort:leaderPort +{% endhighlight %} -6. **Stop ZooKeeper quorum and cluster**: +The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. +The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. +In production setups, it is recommended to manage your own ZooKeeper installation. - <pre> -$ bin/stop-cluster.sh -Stopping taskexecutor daemon (pid: 7647) on localhost. -Stopping standalonesession daemon (pid: 7495) on host localhost. -Stopping standalonesession daemon (pid: 7349) on host localhost. -$ bin/stop-zookeeper-quorum.sh -Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> +{% top %} diff --git a/docs/deployment/ha/zookeeper_ha.zh.md b/docs/deployment/ha/zookeeper_ha.zh.md index d1faf2d..313634b 100644 --- a/docs/deployment/ha/zookeeper_ha.zh.md +++ b/docs/deployment/ha/zookeeper_ha.zh.md @@ -23,113 +23,110 @@ specific language governing permissions and limitations under the License. --> -## ZooKeeper HA Services +Flink's ZooKeeper HA services use [ZooKeeper](http://zookeeper.apache.org) for high availability services. -One high availability services implementation uses ZooKeeper. +* Toc +{:toc} -### Configuration +Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. +ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. +Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. +Flink includes scripts to [bootstrap a simple ZooKeeper](#bootstrap-zookeeper) installation. -To enable JobManager High Availability you have to set the **high-availability mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web UI ports. +## Configuration -Flink leverages **[ZooKeeper](http://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distributed coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http://zookeeper.apache.org/doc/current/zookeeperStarted.html) for more information about ZooKeeper. Flink includes scripts to [bootstrap a simple Zo [...] +In order to start an HA-cluster you have to configure the following configuration keys: -#### Masters File (masters) +- [high-availability]({% link deployment/config.zh.md %}#high-availability-1) (required): +The `high-availability` option has to be set to `zookeeper`. -In order to start an HA-cluster configure the *masters* file in `conf/masters`: - -- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. - - <pre> -jobManagerAddress1:webUIPort1 -[...] -jobManagerAddressX:webUIPortX - </pre> - -By default, the job manager will pick a *random port* for inter process communication. You can change this via the **`high-availability.jobmanager.port`** key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). - -#### Config File (flink-conf.yaml) + <pre>high-availability: zookeeper</pre> -In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: +- [high-availability.storageDir]({% link deployment/config.zh.md %}#high-availability-storagedir) (required): +JobManager metadata is persisted in the file system `high-availability.storageDir` and only a pointer to this state is stored in ZooKeeper. -- **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. -Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. + <pre>high-availability.storageDir: hdfs:///flink/recovery</pre> - <pre>high-availability: zookeeper</pre> + The `storageDir` stores all metadata needed to recover a JobManager failure. -- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. +- [high-availability.zookeeper.quorum]({%link deployment/config.zh.md %}#high-availability-zookeeper-quorum) (required): +A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service. <pre>high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181</pre> - Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port. + Each `addressX:port` refers to a ZooKeeper server, which is reachable by Flink at the given address and port. -- **ZooKeeper root** (recommended): The *root ZooKeeper node*, under which all cluster nodes are placed. +- [high-availability.zookeeper.path.root]({% link deployment/config.zh.md %}#high-availability-zookeeper-path-root) (recommended): +The *root ZooKeeper node*, under which all cluster nodes are placed. - <pre>high-availability.zookeeper.path.root: /flink + <pre>high-availability.zookeeper.path.root: /flink</pre> -- **ZooKeeper cluster-id** (recommended): The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. +- [high-availability.cluster-id]({% link deployment/config.zh.md %}#high-availability-cluster-id) (recommended): +The *cluster-id ZooKeeper node*, under which all required coordination data for a cluster is placed. <pre>high-availability.cluster-id: /default_ns # important: customize per cluster</pre> - **Important**: You should not set this value manually when running a YARN - cluster, a per-job YARN session, or on another cluster manager. In those - cases a cluster-id is automatically being generated based on the application - id. Manually setting a cluster-id overrides this behaviour in YARN. - Specifying a cluster-id with the -z CLI option, in turn, overrides manual - configuration. If you are running multiple Flink HA clusters on bare metal, - you have to manually configure separate cluster-ids for each cluster. + **Important**: + You should not set this value manually when running on YARN, native Kubernetes or on another cluster manager. + In those cases a cluster-id is being automatically generated. + If you are running multiple Flink HA clusters on bare metal, you have to manually configure separate cluster-ids for each cluster. + +### Example configuration -- **Storage directory** (required): JobManager metadata is persisted in the file system *storageDir* and only a pointer to this state is stored in ZooKeeper. +Configure high availability mode and ZooKeeper quorum in `conf/flink-conf.yaml`: - <pre> +{% highlight bash %} +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster high-availability.storageDir: hdfs:///flink/recovery - </pre> +{% endhighlight %} - The `storageDir` stores all metadata needed to recover a JobManager failure. +{% top %} -After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start an HA-cluster. Keep in mind that the **ZooKeeper quorum has to be running** when you call the scripts and make sure to **configure a separate ZooKeeper root path** for each HA cluster you are starting. +## Configuring for ZooKeeper Security -#### Example: Standalone Cluster with 2 JobManagers +If ZooKeeper is running in secure mode with Kerberos, you can override the following configurations in `flink-conf.yaml` as necessary: -1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: +{% highlight bash %} +zookeeper.sasl.service-name: zookeeper # default is "zookeeper". If the ZooKeeper quorum is configured + # with a different service name then it can be supplied here. +zookeeper.sasl.login-context-name: Client # default is "Client". The value needs to match one of the values + # configured in "security.kerberos.login.contexts". +{% endhighlight %} - <pre> -high-availability: zookeeper -high-availability.zookeeper.quorum: localhost:2181 -high-availability.zookeeper.path.root: /flink -high-availability.cluster-id: /cluster_one # important: customize per cluster -high-availability.storageDir: hdfs:///flink/recovery</pre> +For more information on Flink configuration for Kerberos security, please refer to the [security section of the Flink configuration page]({% link deployment/config.md %}#security). +You can also find further details on [how Flink sets up Kerberos-based security internally]({% link deployment/security/security-kerberos.md %}). + +{% top %} -2. **Configure masters** in `conf/masters`: +## ZooKeeper Versions - <pre> -localhost:8081 -localhost:8082</pre> +Flink ships with separate ZooKeeper clients for 3.4 and 3.5, with 3.4 being in the `lib` directory of the distribution +and thus used by default, whereas 3.5 is placed in the `opt` directory. -3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): +The 3.5 client allows you to secure the ZooKeeper connection via SSL, but _may_ not work with 3.4- ZooKeeper installations. - <pre>server.0=localhost:2888:3888</pre> +You can control which version is used by Flink by placing either jar in the `lib` directory. -4. **Start ZooKeeper quorum**: +{% top %} - <pre> -$ bin/start-zookeeper-quorum.sh -Starting zookeeper daemon on host localhost.</pre> +## Bootstrap ZooKeeper -5. **Start an HA-cluster**: +If you don't have a running ZooKeeper installation, you can use the helper scripts, which ship with Flink. - <pre> -$ bin/start-cluster.sh -Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. -Starting standalonesession daemon on host localhost. -Starting standalonesession daemon on host localhost. -Starting taskexecutor daemon on host localhost.</pre> +There is a ZooKeeper configuration template in `conf/zoo.cfg`. +You can configure the hosts to run ZooKeeper on with the `server.X` entries, where X is a unique ID of each server: + +{% highlight bash %} +server.X=addressX:peerPort:leaderPort +[...] +server.Y=addressY:peerPort:leaderPort +{% endhighlight %} -6. **Stop ZooKeeper quorum and cluster**: +The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. +The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. +In production setups, it is recommended to manage your own ZooKeeper installation. - <pre> -$ bin/stop-cluster.sh -Stopping taskexecutor daemon (pid: 7647) on localhost. -Stopping standalonesession daemon (pid: 7495) on host localhost. -Stopping standalonesession daemon (pid: 7349) on host localhost. -$ bin/stop-zookeeper-quorum.sh -Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> +{% top %} diff --git a/docs/deployment/resource-providers/native_kubernetes.md b/docs/deployment/resource-providers/native_kubernetes.md index 1d0d7c8..5d28000 100644 --- a/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/deployment/resource-providers/native_kubernetes.md @@ -375,6 +375,28 @@ $ ./bin/kubernetes-session.sh \ For more details see the [official Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables). +## High-Availability with Native Kubernetes + +For high availability on Kubernetes, you can use the [existing high availability services]({% link deployment/ha/index.md %}). + +### How to configure Kubernetes HA Services + +Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. +{% highlight bash %} +$ ./bin/flink run-application -p 8 -t kubernetes-application \ + -Dkubernetes.cluster-id=<ClusterId> \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dkubernetes.container.image=<CustomImageName> \ + -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ + -Dhigh-availability.storageDir=s3://flink/flink-ha \ + -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ + -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + local:///opt/flink/examples/streaming/StateMachineExample.jar +{% endhighlight %} + ## Kubernetes concepts ### Namespaces diff --git a/docs/deployment/resource-providers/native_kubernetes.zh.md b/docs/deployment/resource-providers/native_kubernetes.zh.md index 309ac5c..d28b4c9 100644 --- a/docs/deployment/resource-providers/native_kubernetes.zh.md +++ b/docs/deployment/resource-providers/native_kubernetes.zh.md @@ -374,6 +374,28 @@ $ ./bin/kubernetes-session.sh \ For more details see the [official Kubernetes documentation](https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables). +## High-Availability with Native Kubernetes + +For high availability on Kubernetes, you can use the [existing high availability services]({% link deployment/ha/index.zh.md %}). + +### How to configure Kubernetes HA Services + +Using the following command to start a native Flink application cluster on Kubernetes with high availability configured. +{% highlight bash %} +$ ./bin/flink run-application -p 8 -t kubernetes-application \ + -Dkubernetes.cluster-id=<ClusterId> \ + -Dtaskmanager.memory.process.size=4096m \ + -Dkubernetes.taskmanager.cpu=2 \ + -Dtaskmanager.numberOfTaskSlots=4 \ + -Dkubernetes.container.image=<CustomImageName> \ + -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ + -Dhigh-availability.storageDir=s3://flink/flink-ha \ + -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \ + -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{site.version}}.jar \ + local:///opt/flink/examples/streaming/StateMachineExample.jar +{% endhighlight %} + ## Kubernetes 概念 ### 命名空间 diff --git a/docs/deployment/resource-providers/standalone/index.md b/docs/deployment/resource-providers/standalone/index.md index 4fa253c..ba09d31 100644 --- a/docs/deployment/resource-providers/standalone/index.md +++ b/docs/deployment/resource-providers/standalone/index.md @@ -150,4 +150,71 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all Make sure to call these scripts on the hosts on which you want to start/stop the respective instance. +## High-Availability with Standalone + +In order to enable HA for a standalone cluster, you have to use the [ZooKeeper HA services]({% link deployment/ha/zookeeper_ha.md %}). + +Additionally, you have to configure your cluster to start multiple JobManagers. + +### Masters File (masters) + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. + + <pre> +jobManagerAddress1:webUIPort1 +[...] +jobManagerAddressX:webUIPortX + </pre> + +By default, the job manager will pick a *random port* for inter process communication. You can change this via the [high-availability.jobmanager.port]({% link deployment/config.md %}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). + +### Example: Standalone Cluster with 2 JobManagers + +1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: + + <pre> +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster +high-availability.storageDir: hdfs:///flink/recovery</pre> + +2. **Configure masters** in `conf/masters`: + + <pre> +localhost:8081 +localhost:8082</pre> + +3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): + + <pre>server.0=localhost:2888:3888</pre> + +4. **Start ZooKeeper quorum**: + + <pre> +$ bin/start-zookeeper-quorum.sh +Starting zookeeper daemon on host localhost.</pre> + +5. **Start an HA-cluster**: + + <pre> +$ bin/start-cluster.sh +Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. +Starting standalonesession daemon on host localhost. +Starting standalonesession daemon on host localhost. +Starting taskexecutor daemon on host localhost.</pre> + +6. **Stop ZooKeeper quorum and cluster**: + + <pre> +$ bin/stop-cluster.sh +Stopping taskexecutor daemon (pid: 7647) on localhost. +Stopping standalonesession daemon (pid: 7495) on host localhost. +Stopping standalonesession daemon (pid: 7349) on host localhost. +$ bin/stop-zookeeper-quorum.sh +Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> + + {% top %} diff --git a/docs/deployment/resource-providers/standalone/index.zh.md b/docs/deployment/resource-providers/standalone/index.zh.md index 743c738..02f3718 100644 --- a/docs/deployment/resource-providers/standalone/index.zh.md +++ b/docs/deployment/resource-providers/standalone/index.zh.md @@ -163,4 +163,72 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all 确保在你想启动/关闭相应实例的主机上执行这些脚本。 +## High-Availability with Standalone + +In order to enable HA for a standalone cluster, you have to use the [ZooKeeper HA services]({% link deployment/ha/zookeeper_ha.zh.md %}). + +Additionally, you have to configure your cluster to start multiple JobManagers. + +### Masters File (masters) + +In order to start an HA-cluster configure the *masters* file in `conf/masters`: + +- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds. + + <pre> +jobManagerAddress1:webUIPort1 +[...] +jobManagerAddressX:webUIPortX + </pre> + +By default, the job manager will pick a *random port* for inter process communication. You can change this via the [high-availability.jobmanager.port]({% link deployment/config.md %}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). + +### Example: Standalone Cluster with 2 JobManagers + +1. **Configure high availability mode and ZooKeeper quorum** in `conf/flink-conf.yaml`: + + <pre> +high-availability: zookeeper +high-availability.zookeeper.quorum: localhost:2181 +high-availability.zookeeper.path.root: /flink +high-availability.cluster-id: /cluster_one # important: customize per cluster +high-availability.storageDir: hdfs:///flink/recovery</pre> + +2. **Configure masters** in `conf/masters`: + + <pre> +localhost:8081 +localhost:8082</pre> + +3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine): + + <pre>server.0=localhost:2888:3888</pre> + +4. **Start ZooKeeper quorum**: + + <pre> +$ bin/start-zookeeper-quorum.sh +Starting zookeeper daemon on host localhost.</pre> + +5. **Start an HA-cluster**: + + <pre> +$ bin/start-cluster.sh +Starting HA cluster with 2 masters and 1 peers in ZooKeeper quorum. +Starting standalonesession daemon on host localhost. +Starting standalonesession daemon on host localhost. +Starting taskexecutor daemon on host localhost.</pre> + +6. **Stop ZooKeeper quorum and cluster**: + + <pre> +$ bin/stop-cluster.sh +Stopping taskexecutor daemon (pid: 7647) on localhost. +Stopping standalonesession daemon (pid: 7495) on host localhost. +Stopping standalonesession daemon (pid: 7349) on host localhost. +$ bin/stop-zookeeper-quorum.sh +Stopping zookeeper daemon (pid: 7101) on host localhost.</pre> + + + {% top %} diff --git a/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/deployment/resource-providers/standalone/kubernetes.md index bd1b9a3..22bb1f2 100644 --- a/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/deployment/resource-providers/standalone/kubernetes.md @@ -162,6 +162,34 @@ with the `kubectl` command: kubectl delete -f jobmanager-job.yaml ``` +## High-Availability with Standalone Kubernetes + +For high availability on Kubernetes, you can use the [existing high availability services]({% link deployment/ha/index.md %}). + +### How to configure Kubernetes HA Services + +Session Mode, Per-Job Mode, and Application Mode clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All other yamls do not need to be updated. + +<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.md %}#using-plugins) for more information. + +{% highlight yaml %} +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + ... + kubernetes.cluster-id: <cluster-id> + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: hdfs:///flink/recovery + restart-strategy: fixed-delay + restart-strategy.fixed-delay.attempts: 10 + ... +{% endhighlight %} + ## Appendix ### Common cluster resource definitions diff --git a/docs/deployment/resource-providers/standalone/kubernetes.zh.md b/docs/deployment/resource-providers/standalone/kubernetes.zh.md index 70d4219..7706b1d 100644 --- a/docs/deployment/resource-providers/standalone/kubernetes.zh.md +++ b/docs/deployment/resource-providers/standalone/kubernetes.zh.md @@ -162,6 +162,35 @@ with the `kubectl` command: kubectl delete -f jobmanager-job.yaml ``` +## High-Availability with Standalone Kubernetes + +For high availability on Kubernetes, you can use the [existing high availability services]({% link deployment/ha/index.zh.md %}). + +### How to configure Kubernetes HA Services + +Session Mode, Per-Job Mode, and Application Mode clusters support using the Kubernetes high availability service. Users just need to add the following Flink config options to [flink-configuration-configmap.yaml](#common-cluster-resource-definitions). All other yamls do not need to be updated. + +<span class="label label-info">Note</span> The filesystem which corresponds to the scheme of your configured HA storage directory must be available to the runtime. Refer to [custom Flink image]({% link deployment/resource-providers/standalone/docker.zh.md %}#customize-flink-image) and [enable plugins]({% link deployment/resource-providers/standalone/docker.zh.md %}#using-plugins) for more information. + +{% highlight yaml %} +apiVersion: v1 +kind: ConfigMap +metadata: + name: flink-config + labels: + app: flink +data: + flink-conf.yaml: |+ + ... + kubernetes.cluster-id: <cluster-id> + high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability.storageDir: hdfs:///flink/recovery + restart-strategy: fixed-delay + restart-strategy.fixed-delay.attempts: 10 + ... +{% endhighlight %} + + ## Appendix ### Common cluster resource definitions
