This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 324d7bed226cb707512879842c33788f7f715ff2
Author: Till Rohrmann <[email protected]>
AuthorDate: Tue Sep 18 16:10:13 2018 +0200

    [FLINK-8660][ha] Add InstantiationUtil#instantiate to create instance from 
class name
    
    InstantiationUtil#instantiate takes a class name, a target type and a class 
loader to load
    a class of the given class name and create an instance of it.
---
 .../org/apache/flink/util/InstantiationUtil.java   | 19 ++++++++++++
 .../HighAvailabilityServicesUtils.java             | 36 ++++++++++++++++------
 .../HighAvailabilityServicesUtilsTest.java         |  5 ++-
 3 files changed, 49 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 2370c7c..a36560e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -287,6 +287,25 @@ public final class InstantiationUtil {
        }
 
        /**
+        * Creates a new instance of the given class name and type using the 
provided {@link ClassLoader}.
+        *
+        * @param className of the class to load
+        * @param targetType type of the instantiated class
+        * @param classLoader to use for loading the class
+        * @param <T> type of the instantiated class
+        * @return Instance of the given class name
+        * @throws ClassNotFoundException if the class could not be found
+        */
+       public static <T> T instantiate(final String className, final Class<T> 
targetType, final ClassLoader classLoader) throws ClassNotFoundException {
+               final Class<? extends T> clazz = Class.forName(
+                       className,
+                       false,
+                       classLoader).asSubclass(targetType);
+
+               return instantiate(clazz);
+       }
+
+       /**
         * Creates a new instance of the given class.
         *
         * @param <T> The generic type of the class.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 78484d3..05f96ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -37,6 +37,8 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.concurrent.Executor;
 
@@ -159,19 +161,33 @@ public class HighAvailabilityServicesUtils {
                return Tuple2.of(hostname, port);
        }
 
-       private static HighAvailabilityServices 
createCustomHAServices(Configuration config, Executor executor) throws 
Exception {
-               Class<HighAvailabilityServicesFactory> factoryClass;
+       private static HighAvailabilityServices 
createCustomHAServices(Configuration config, Executor executor) throws 
FlinkException {
+               final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+               final String haServicesClassName = 
config.getString(HighAvailabilityOptions.HA_MODE);
+
+               final HighAvailabilityServicesFactory 
highAvailabilityServicesFactory;
+
                try {
-                       factoryClass = config.getClass(
-                               HighAvailabilityOptions.HA_MODE.key(), null, 
Thread.currentThread().getContextClassLoader());
-               } catch (ClassNotFoundException e) {
-                       throw new Exception("Custom HA FactoryClass not found");
+                       highAvailabilityServicesFactory = 
InstantiationUtil.instantiate(
+                               haServicesClassName,
+                               HighAvailabilityServicesFactory.class,
+                               classLoader);
+               } catch (Exception e) {
+                       throw new FlinkException(
+                               String.format(
+                                       "Could not instantiate the 
HighAvailabilityServicesFactory '%s'. Please make sure that this class is on 
your class path.",
+                                       haServicesClassName),
+                               e);
                }
 
-               if (factoryClass != null && 
HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) {
-                       return 
factoryClass.newInstance().createHAServices(config, executor);
-               } else {
-                       throw new Exception("Custom HA FactoryClass is not 
valid.");
+               try {
+                       return 
highAvailabilityServicesFactory.createHAServices(config, executor);
+               } catch (Exception e) {
+                       throw new FlinkException(
+                               String.format(
+                                       "Could not create the ha services from 
the instantiated HighAvailabilityServicesFactory %s.",
+                                       haServicesClassName),
+                               e);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
index e9063ac..c4f6473 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java
@@ -71,7 +71,10 @@ public class HighAvailabilityServicesUtilsTest extends 
TestLogger {
                
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, 
executor);
        }
 
-       private static class TestHAFactory implements 
HighAvailabilityServicesFactory {
+       /**
+        * Testing class which needs to be public in order to be instantiatable.
+        */
+       public static class TestHAFactory implements 
HighAvailabilityServicesFactory {
 
                static HighAvailabilityServices haServices;
 

Reply via email to