This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 324d7bed226cb707512879842c33788f7f715ff2 Author: Till Rohrmann <[email protected]> AuthorDate: Tue Sep 18 16:10:13 2018 +0200 [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from class name InstantiationUtil#instantiate takes a class name, a target type and a class loader to load a class of the given class name and create an instance of it. --- .../org/apache/flink/util/InstantiationUtil.java | 19 ++++++++++++ .../HighAvailabilityServicesUtils.java | 36 ++++++++++++++++------ .../HighAvailabilityServicesUtilsTest.java | 5 ++- 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 2370c7c..a36560e 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -287,6 +287,25 @@ public final class InstantiationUtil { } /** + * Creates a new instance of the given class name and type using the provided {@link ClassLoader}. + * + * @param className of the class to load + * @param targetType type of the instantiated class + * @param classLoader to use for loading the class + * @param <T> type of the instantiated class + * @return Instance of the given class name + * @throws ClassNotFoundException if the class could not be found + */ + public static <T> T instantiate(final String className, final Class<T> targetType, final ClassLoader classLoader) throws ClassNotFoundException { + final Class<? extends T> clazz = Class.forName( + className, + false, + classLoader).asSubclass(targetType); + + return instantiate(clazz); + } + + /** * Creates a new instance of the given class. * * @param <T> The generic type of the class. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 78484d3..05f96ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; import java.util.concurrent.Executor; @@ -159,19 +161,33 @@ public class HighAvailabilityServicesUtils { return Tuple2.of(hostname, port); } - private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception { - Class<HighAvailabilityServicesFactory> factoryClass; + private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException { + final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE); + + final HighAvailabilityServicesFactory highAvailabilityServicesFactory; + try { - factoryClass = config.getClass( - HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader()); - } catch (ClassNotFoundException e) { - throw new Exception("Custom HA FactoryClass not found"); + highAvailabilityServicesFactory = InstantiationUtil.instantiate( + haServicesClassName, + HighAvailabilityServicesFactory.class, + classLoader); + } catch (Exception e) { + throw new FlinkException( + String.format( + "Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.", + haServicesClassName), + e); } - if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) { - return factoryClass.newInstance().createHAServices(config, executor); - } else { - throw new Exception("Custom HA FactoryClass is not valid."); + try { + return highAvailabilityServicesFactory.createHAServices(config, executor); + } catch (Exception e) { + throw new FlinkException( + String.format( + "Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.", + haServicesClassName), + e); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java index e9063ac..c4f6473 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java @@ -71,7 +71,10 @@ public class HighAvailabilityServicesUtilsTest extends TestLogger { HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor); } - private static class TestHAFactory implements HighAvailabilityServicesFactory { + /** + * Testing class which needs to be public in order to be instantiatable. + */ + public static class TestHAFactory implements HighAvailabilityServicesFactory { static HighAvailabilityServices haServices;
