This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3bc7bc2e80af1e277ffb4906e6aca416a9de954b Author: Krzysztof BiaĆek <[email protected]> AuthorDate: Tue Sep 18 11:18:03 2018 +0200 [FLINK-8660][ha] Enable user to provide custom HAServices implementation Create BlobStorage for any HA backend HighAvailabilityServicesFactory may throw exceptions Docs Use ha mode config property to specify factory class FQN Update docs --- docs/_includes/generated/common_section.html | 2 +- .../generated/high_availability_configuration.html | 2 +- docs/ops/jobmanager_high_availability.md | 1 + .../configuration/HighAvailabilityOptions.java | 4 +- .../org/apache/flink/runtime/blob/BlobUtils.java | 8 +-- .../HighAvailabilityServicesFactory.java | 39 ++++++++++ .../HighAvailabilityServicesUtils.java | 28 +++++++- .../runtime/jobmanager/HighAvailabilityMode.java | 29 ++++---- .../HighAvailabilityServicesUtilsTest.java | 83 ++++++++++++++++++++++ .../jobmanager/HighAvailabilityModeTest.java | 33 ++++++++- 10 files changed, 203 insertions(+), 26 deletions(-) diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html index ea881e0..65804db 100644 --- a/docs/_includes/generated/common_section.html +++ b/docs/_includes/generated/common_section.html @@ -45,7 +45,7 @@ <tr> <td><h5>high-availability</h5></td> <td style="word-wrap: break-word;">"NONE"</td> - <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td> + <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td> </tr> <tr> <td><h5>high-availability.storageDir</h5></td> diff --git a/docs/_includes/generated/high_availability_configuration.html b/docs/_includes/generated/high_availability_configuration.html index b1e2ea9..398379d 100644 --- a/docs/_includes/generated/high_availability_configuration.html +++ b/docs/_includes/generated/high_availability_configuration.html @@ -10,7 +10,7 @@ <tr> <td><h5>high-availability</h5></td> <td style="word-wrap: break-word;">"NONE"</td> - <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER".</td> + <td>Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.</td> </tr> <tr> <td><h5>high-availability.cluster-id</h5></td> diff --git a/docs/ops/jobmanager_high_availability.md b/docs/ops/jobmanager_high_availability.md index 320f9d6..2c2f1a6 100644 --- a/docs/ops/jobmanager_high_availability.md +++ b/docs/ops/jobmanager_high_availability.md @@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for inter process communic In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. +Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. <pre>high-availability: zookeeper</pre> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 787efff..5881533 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -43,6 +43,7 @@ public class HighAvailabilityOptions { * Defines high-availability mode used for the cluster execution. * A value of "NONE" signals no highly available setup. * To enable high-availability, set this mode to "ZOOKEEPER". + * Can also be set to FQN of HighAvailability factory class. */ @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY) public static final ConfigOption<String> HA_MODE = @@ -50,7 +51,7 @@ public class HighAvailabilityOptions { .defaultValue("NONE") .withDeprecatedKeys("recovery.mode") .withDescription("Defines high-availability mode used for the cluster execution." + - " To enable high-availability, set this mode to \"ZOOKEEPER\"."); + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class."); /** * The ID of the Flink cluster, used to separate multiple Flink clusters @@ -73,7 +74,6 @@ public class HighAvailabilityOptions { .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir") .withDescription("File system path (URI) where Flink persists metadata in high-availability setups."); - // ------------------------------------------------------------------------ // Recovery Options // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index a61d679..e1caa9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -88,14 +88,10 @@ public class BlobUtils { * thrown if the (distributed) file storage cannot be created */ public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException { - HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); - - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - return new VoidBlobStore(); - } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) { return createFileSystemBlobStore(config); } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); + return new VoidBlobStore(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java new file mode 100644 index 0000000..0abc6de --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.configuration.Configuration; + +import java.util.concurrent.Executor; + +/** + * Factory interface for {@link HighAvailabilityServices}. + */ +public interface HighAvailabilityServicesFactory { + + /** + * Creates an {@link HighAvailabilityServices} instance. + * + * @param configuration Flink configuration + * @param executor background task executor + * @return instance of {@link HighAvailabilityServices} + * @throws Exception when HAServices cannot be created + */ + HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 778d2db..78484d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.blob.BlobStoreService; @@ -64,11 +65,14 @@ public class HighAvailabilityServicesUtils { config, blobStoreService); + case FACTORY_CLASS: + return createCustomHAServices(config, executor); + default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } } - + public static HighAvailabilityServices createHighAvailabilityServices( Configuration configuration, Executor executor, @@ -76,7 +80,7 @@ public class HighAvailabilityServicesUtils { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); - switch(highAvailabilityMode) { + switch (highAvailabilityMode) { case NONE: final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration); @@ -119,6 +123,10 @@ public class HighAvailabilityServicesUtils { executor, configuration, blobStoreService); + + case FACTORY_CLASS: + return createCustomHAServices(configuration, executor); + default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } @@ -151,6 +159,22 @@ public class HighAvailabilityServicesUtils { return Tuple2.of(hostname, port); } + private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception { + Class<HighAvailabilityServicesFactory> factoryClass; + try { + factoryClass = config.getClass( + HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new Exception("Custom HA FactoryClass not found"); + } + + if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) { + return factoryClass.newInstance().createHAServices(config, executor); + } else { + throw new Exception("Custom HA FactoryClass is not valid."); + } + } + public enum AddressResolution { TRY_ADDRESS_RESOLUTION, NO_ADDRESS_RESOLUTION diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java index 7dc13c2..65c202a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java @@ -30,10 +30,19 @@ import org.apache.flink.configuration.HighAvailabilityOptions; * ZooKeeper is used to select a leader among a group of JobManager. This JobManager * is responsible for the job execution. Upon failure of the leader a new leader is elected * which will take over the responsibilities of the old leader + * - FACTORY_CLASS: Use implementation of {@link org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory} + * specified in configuration property high-availability */ public enum HighAvailabilityMode { - NONE, - ZOOKEEPER; + NONE(false), + ZOOKEEPER(true), + FACTORY_CLASS(true); + + private final boolean haActive; + + HighAvailabilityMode(boolean haActive) { + this.haActive = haActive; + } /** * Return the configured {@link HighAvailabilityMode}. @@ -51,7 +60,11 @@ public enum HighAvailabilityMode { // Map old default to new default return HighAvailabilityMode.NONE; } else { - return HighAvailabilityMode.valueOf(haMode.toUpperCase()); + try { + return HighAvailabilityMode.valueOf(haMode.toUpperCase()); + } catch (IllegalArgumentException e) { + return FACTORY_CLASS; + } } } @@ -63,14 +76,6 @@ public enum HighAvailabilityMode { */ public static boolean isHighAvailabilityModeActivated(Configuration configuration) { HighAvailabilityMode mode = fromConfig(configuration); - switch (mode) { - case NONE: - return false; - case ZOOKEEPER: - return true; - default: - return false; - } - + return mode.haActive; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java new file mode 100644 index 0000000..e9063ac --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.Executor; + +import static org.junit.Assert.assertSame; + +/** + * Tests for the {@link HighAvailabilityServicesUtils} class. + */ +public class HighAvailabilityServicesUtilsTest extends TestLogger { + + @Test + public void testCreateCustomHAServices() throws Exception { + Configuration config = new Configuration(); + + HighAvailabilityServices haServices = Mockito.mock(HighAvailabilityServices.class); + TestHAFactory.haServices = haServices; + + Executor executor = Mockito.mock(Executor.class); + + config.setString(HighAvailabilityOptions.HA_MODE, TestHAFactory.class.getName()); + + // when + HighAvailabilityServices actualHaServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor); + + // then + assertSame(haServices, actualHaServices); + + // when + actualHaServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor, + HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + // then + assertSame(haServices, actualHaServices); + } + + @Test(expected = Exception.class) + public void testCustomHAServicesFactoryNotDefined() throws Exception { + Configuration config = new Configuration(); + + Executor executor = Mockito.mock(Executor.class); + + config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase()); + + // expect + HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor); + } + + private static class TestHAFactory implements HighAvailabilityServicesFactory { + + static HighAvailabilityServices haServices; + + @Override + public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) { + return haServices; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java index 91fb514..e708c10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java @@ -21,15 +21,20 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -public class HighAvailabilityModeTest { +/** + * Tests for the {@link HighAvailabilityMode}. + */ +public class HighAvailabilityModeTest extends TestLogger { // Default HA mode - private final static HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf( + private static final HighAvailabilityMode DEFAULT_HA_MODE = HighAvailabilityMode.valueOf( ConfigConstants.DEFAULT_HA_MODE.toUpperCase()); /** @@ -45,6 +50,10 @@ public class HighAvailabilityModeTest { // Check not equals default config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config)); + + // Check factory class + config.setString(HighAvailabilityOptions.HA_MODE, "factory.class.FQN"); + assertEquals(HighAvailabilityMode.FACTORY_CLASS, HighAvailabilityMode.fromConfig(config)); } /** @@ -69,4 +78,24 @@ public class HighAvailabilityModeTest { assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config)); } + @Test + public void testCheckHighAvailabilityModeActivated() throws Exception { + Configuration config = new Configuration(); + + // check defaults + assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check NONE + config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase()); + assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check ZOOKEEPER + config.setString("high-availability", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); + assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check FACTORY_CLASS + config.setString("high-availability", HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase()); + assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + } + }
