This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb05a7be9b828b7e582e75e4832443806fa4ff17 Author: Chesnay Schepler <[email protected]> AuthorDate: Fri May 27 09:58:37 2022 +0200 [FLINK:27808][ha] Allow "kubernetes" as HA_MODE --- .../content.zh/docs/deployment/ha/kubernetes_ha.md | 4 +-- .../resource-providers/standalone/kubernetes.md | 2 +- docs/content/docs/deployment/ha/kubernetes_ha.md | 4 +-- .../resource-providers/standalone/kubernetes.md | 2 +- .../common_high_availability_section.html | 2 +- .../generated/high_availability_configuration.html | 2 +- .../configuration/HighAvailabilityOptions.java | 6 ++-- ...HighAvailabilityRecoverFromSavepointITCase.java | 5 ++-- .../factory/KubernetesJobManagerFactoryTest.java | 6 ++-- .../KubernetesJobManagerParametersTest.java | 6 ++-- .../HighAvailabilityServicesUtils.java | 34 +++++++++++++++++++--- .../runtime/jobmanager/HighAvailabilityMode.java | 1 + 12 files changed, 48 insertions(+), 26 deletions(-) diff --git a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md index fb0cb0eb16a..b651a95579d 100644 --- a/docs/content.zh/docs/deployment/ha/kubernetes_ha.md +++ b/docs/content.zh/docs/deployment/ha/kubernetes_ha.md @@ -46,7 +46,7 @@ Kubernetes 高可用服务只能在部署到 Kubernetes 时使用。因此,当 `high-availability` 选项必须设置为 `KubernetesHaServicesFactory`. ```yaml -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability: kubernetes ``` - [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (必要的): @@ -71,7 +71,7 @@ kubernetes.cluster-id: cluster1337 ```yaml kubernetes.cluster-id: <cluster-id> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery ``` diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md index f67be7f747f..e57ac7c3334 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md @@ -230,7 +230,7 @@ data: flink-conf.yaml: |+ ... kubernetes.cluster-id: <cluster-id> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 diff --git a/docs/content/docs/deployment/ha/kubernetes_ha.md b/docs/content/docs/deployment/ha/kubernetes_ha.md index 550c3c792d0..a6e93cf4643 100644 --- a/docs/content/docs/deployment/ha/kubernetes_ha.md +++ b/docs/content/docs/deployment/ha/kubernetes_ha.md @@ -48,7 +48,7 @@ In order to start an HA-cluster you have to configure the following configuratio The `high-availability` option has to be set to `KubernetesHaServicesFactory`. ```yaml -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability: kubernetes ``` - [high-availability.storageDir]({{< ref "docs/deployment/config" >}}#high-availability-storagedir) (required): @@ -73,7 +73,7 @@ Configure high availability mode in `conf/flink-conf.yaml`: ```yaml kubernetes.cluster-id: <cluster-id> -high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory +high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery ``` diff --git a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md index 31c313edfdb..5c50b874f4a 100644 --- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md @@ -213,7 +213,7 @@ data: flink-conf.yaml: |+ ... kubernetes.cluster-id: <cluster-id> - high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory + high-availability: kubernetes high-availability.storageDir: hdfs:///flink/recovery restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 10 diff --git a/docs/layouts/shortcodes/generated/common_high_availability_section.html b/docs/layouts/shortcodes/generated/common_high_availability_section.html index 2f4b4e83878..d4a7463b6aa 100644 --- a/docs/layouts/shortcodes/generated/common_high_availability_section.html +++ b/docs/layouts/shortcodes/generated/common_high_availability_section.html @@ -12,7 +12,7 @@ <td><h5>high-availability</h5></td> <td style="word-wrap: break-word;">"NONE"</td> <td>String</td> - <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td> + <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td> </tr> <tr> <td><h5>high-availability.cluster-id</h5></td> diff --git a/docs/layouts/shortcodes/generated/high_availability_configuration.html b/docs/layouts/shortcodes/generated/high_availability_configuration.html index 324f3378fb1..1c93975923c 100644 --- a/docs/layouts/shortcodes/generated/high_availability_configuration.html +++ b/docs/layouts/shortcodes/generated/high_availability_configuration.html @@ -12,7 +12,7 @@ <td><h5>high-availability</h5></td> <td style="word-wrap: break-word;">"NONE"</td> <td>String</td> - <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td> + <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER", "KUBERNETES" or specify FQN of factory class.</td> </tr> <tr> <td><h5>high-availability.cluster-id</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index b156c152d9e..97606c5679e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -33,8 +33,8 @@ public class HighAvailabilityOptions { /** * Defines high-availability mode used for the cluster execution. A value of "NONE" signals no - * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER". Can also - * be set to FQN of HighAvailability factory class. + * highly available setup. To enable high-availability, set this mode to "ZOOKEEPER" or + * "KUBERNETES". Can also be set to FQN of HighAvailability factory class. */ @Documentation.Section(Documentation.Sections.COMMON_HIGH_AVAILABILITY) public static final ConfigOption<String> HA_MODE = @@ -44,7 +44,7 @@ public class HighAvailabilityOptions { .withDeprecatedKeys("recovery.mode") .withDescription( "Defines high-availability mode used for the cluster execution." - + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class."); + + " To enable high-availability, set this mode to \"ZOOKEEPER\", \"KUBERNETES\" or specify FQN of factory class."); /** * The ID of the Flink cluster, used to separate multiple Flink clusters Needs to be set for diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java index 386d3115e07..679f5edc621 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityRecoverFromSavepointITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.KubernetesExtension; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; @@ -140,9 +141,7 @@ class KubernetesHighAvailabilityRecoverFromSavepointITCase { private static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set(KubernetesConfigOptions.CLUSTER_ID, CLUSTER_ID); - configuration.set( - HighAvailabilityOptions.HA_MODE, - KubernetesHaServicesFactory.class.getCanonicalName()); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name()); try { temporaryPath = Files.createTempDirectory("haStorage"); } catch (IOException e) { diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index be6f6357346..302655dd5c3 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -26,7 +26,6 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; -import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; import org.apache.flink.kubernetes.kubeclient.FlinkPod; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase; @@ -38,6 +37,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator; import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.Container; @@ -459,9 +459,7 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase { @Test void testSetJobManagerDeploymentReplicas() throws Exception { - flinkConfig.set( - HighAvailabilityOptions.HA_MODE, - KubernetesHaServicesFactory.class.getCanonicalName()); + flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name()); flinkConfig.set( KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, JOBMANAGER_REPLICAS); kubernetesJobManagerSpecification = diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 0a44f015b9c..067d8cd3b19 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -28,8 +28,8 @@ import org.apache.flink.configuration.RestOptions; import org.apache.flink.kubernetes.KubernetesTestBase; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; -import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FlinkRuntimeException; import org.junit.jupiter.api.Test; @@ -250,9 +250,7 @@ class KubernetesJobManagerParametersTest extends KubernetesTestBase { @Test void testGetReplicas() { - flinkConfig.set( - HighAvailabilityOptions.HA_MODE, - KubernetesHaServicesFactory.class.getCanonicalName()); + flinkConfig.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.KUBERNETES.name()); flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2); assertThat(kubernetesJobManagerParameters.getReplicas()).isEqualTo(2); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index b54b86dda23..c5155682b28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -68,6 +68,12 @@ public class HighAvailabilityServicesUtils { case ZOOKEEPER: return createZooKeeperHaServices(config, executor, fatalErrorHandler); + case KUBERNETES: + return createCustomHAServices( + "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", + config, + executor); + case FACTORY_CLASS: return createCustomHAServices(config, executor); @@ -129,6 +135,11 @@ public class HighAvailabilityServicesUtils { resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress); case ZOOKEEPER: return createZooKeeperHaServices(configuration, executor, fatalErrorHandler); + case KUBERNETES: + return createCustomHAServices( + "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", + configuration, + executor); case FACTORY_CLASS: return createCustomHAServices(configuration, executor); @@ -152,6 +163,10 @@ public class HighAvailabilityServicesUtils { return new ZooKeeperClientHAServices( ZooKeeperUtils.startCuratorFramework(configuration, fatalErrorHandler), configuration); + case KUBERNETES: + return createCustomClientHAServices( + "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory", + configuration); case FACTORY_CLASS: return createCustomClientHAServices(configuration); default: @@ -267,9 +282,15 @@ public class HighAvailabilityServicesUtils { private static HighAvailabilityServices createCustomHAServices( Configuration config, Executor executor) throws FlinkException { + return createCustomHAServices( + config.getString(HighAvailabilityOptions.HA_MODE), config, executor); + } + + private static HighAvailabilityServices createCustomHAServices( + String factoryClassName, Configuration config, Executor executor) + throws FlinkException { final HighAvailabilityServicesFactory highAvailabilityServicesFactory = - loadCustomHighAvailabilityServicesFactory( - config.getString(HighAvailabilityOptions.HA_MODE)); + loadCustomHighAvailabilityServicesFactory(factoryClassName); try { return highAvailabilityServicesFactory.createHAServices(config, executor); @@ -294,9 +315,14 @@ public class HighAvailabilityServicesUtils { private static ClientHighAvailabilityServices createCustomClientHAServices(Configuration config) throws FlinkException { + return createCustomClientHAServices( + config.getString(HighAvailabilityOptions.HA_MODE), config); + } + + private static ClientHighAvailabilityServices createCustomClientHAServices( + String factoryClassName, Configuration config) throws FlinkException { final HighAvailabilityServicesFactory highAvailabilityServicesFactory = - loadCustomHighAvailabilityServicesFactory( - config.getString(HighAvailabilityOptions.HA_MODE)); + loadCustomHighAvailabilityServicesFactory(factoryClassName); try { return highAvailabilityServicesFactory.createClientHAServices(config); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java index 3f4137b1840..466ed60f4ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions; public enum HighAvailabilityMode { NONE(false), ZOOKEEPER(true), + KUBERNETES(true), FACTORY_CLASS(true); private final boolean haActive;
