Repository: samza
Updated Branches:
  refs/heads/master f493f5b9e -> 7b07df321


SAMZA-1005 - Refactor class instantiation code to a helper class


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b07df32
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b07df32
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b07df32

Branch: refs/heads/master
Commit: 7b07df321b4312c13acc18bc46c765e44f794064
Parents: f493f5b
Author: Branislav Cogic <[email protected]>
Authored: Sat Sep 10 16:43:30 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Sat Sep 10 16:43:30 2016 -0700

----------------------------------------------------------------------
 .../clustermanager/ContainerProcessManager.java |  3 +-
 .../apache/samza/util/ClassLoaderHelper.java    | 29 ++++++++++++++++++++
 .../apache/samza/container/SamzaContainer.scala |  5 ++--
 .../scala/org/apache/samza/job/JobRunner.scala  |  3 +-
 .../org/apache/samza/util/CommandLine.scala     |  2 +-
 .../org/apache/samza/monitor/MonitorLoader.java |  6 ++--
 .../apache/samza/rest/SamzaRestApplication.java |  4 +--
 .../samza/rest/proxy/job/AbstractJobProxy.java  |  7 ++---
 .../samza/validation/YarnJobValidationTool.java |  3 +-
 9 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index c6bfec0..1fed2fb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -26,6 +26,7 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;
@@ -402,7 +403,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     final ResourceManagerFactory factory;
 
     try {
-      factory = (ResourceManagerFactory) 
Class.forName(containerManagerFactoryClass).newInstance();
+      factory = 
ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass);
     } catch (InstantiationException e) {
       log.error("Instantiation exception when creating ContainerManager", e);
       throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java 
b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
new file mode 100644
index 0000000..f2b389b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+public class ClassLoaderHelper {
+
+  public static <T> T fromClassName(String className) throws 
ClassNotFoundException, InstantiationException, IllegalAccessException {
+    Class<T> clazz = (Class<T>) Class.forName(className);
+    T instance = clazz.newInstance();
+    return instance;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 4ab4bce..05a996c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -74,6 +74,7 @@ import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.AsyncStreamTaskAdapter
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThrottlingExecutor
@@ -441,11 +442,11 @@ object SamzaContainer extends Logging {
 
       val taskName = taskModel.getTaskName
 
-      val taskObj = Class.forName(taskClassName).newInstance
+      val taskObj = ClassLoaderHelper.fromClassName[StreamTask](taskClassName)
 
       val task = if (!singleThreadMode && !isAsyncTask)
         // Wrap the StreamTask into a AsyncStreamTask with the build-in thread 
pool
-        new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], 
taskThreadPool)
+        new AsyncStreamTaskAdapter(taskObj, taskThreadPool)
       else
         taskObj
 

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 383bb13..022b480 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
+import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -88,7 +89,7 @@ class JobRunner(config: Config) extends Logging {
       case Some(factoryClass) => factoryClass
       case _ => throw new SamzaException("no job factory class defined")
     }
-    val jobFactory = 
Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
+    val jobFactory = 
ClassLoaderHelper.fromClassName[StreamJobFactory](jobFactoryClass)
     info("job factory: %s" format (jobFactoryClass))
     val factory = new CoordinatorStreamSystemFactory
     val coordinatorSystemConsumer = 
factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index f26501b..6962b22 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -63,7 +63,7 @@ class CommandLine {
     // Set up the job parameters.
     val configFactoryClass = options.valueOf(configFactoryOpt)
     val configPaths = options.valuesOf(configPathOpt)
-    configFactory = 
Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
+    configFactory = 
ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass)
     val configOverrides = options.valuesOf(configOverrideOpt).map(kv => 
(kv.key, kv.value)).toMap
 
     val configs: Buffer[java.util.Map[String, String]] = 
configPaths.map(configFactory.getConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
index 75f3867..502ecc4 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.monitor;
 
+import org.apache.samza.util.ClassLoaderHelper;
+
 import java.lang.reflect.Constructor;
 
 class MonitorLoader {
@@ -28,9 +30,7 @@ class MonitorLoader {
         throws InstantiationException {
         Object monitorObject;
         try {
-            Class<?> klass = Class.forName(monitorClassName);
-            Constructor<?> constructor = klass.getConstructor();
-            monitorObject = constructor.newInstance();
+            monitorObject = ClassLoaderHelper.fromClassName(monitorClassName);
         } catch (Exception e) {
             throw (InstantiationException)
                 new InstantiationException("Unable to instantiate " + 
monitorClassName).initCause(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
index 61f3c46..a6e0bb0 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -24,6 +24,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;
 import org.apache.samza.rest.resources.ResourceFactory;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.slf4j.Logger;
@@ -83,8 +84,7 @@ public class SamzaRestApplication extends ResourceConfig {
   private Collection<? extends Object> instantiateFactoryResources(String 
factoryClassName, Config config)
       throws InstantiationException {
     try {
-      Class factoryCls = Class.forName(factoryClassName);
-      ResourceFactory factory = (ResourceFactory) factoryCls.newInstance();
+      ResourceFactory factory = 
ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName);
       return factory.getResourceInstances(config);
     } catch (Exception e) {
       throw (InstantiationException)

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index bcc88d0..4d8647f 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -29,6 +29,7 @@ import 
org.apache.samza.config.factories.PropertiesConfigFactory;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.JobsResourceConfig;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +53,7 @@ public abstract class AbstractJobProxy implements JobProxy {
     String jobProxyFactory = config.getJobProxyFactory();
     if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
       try {
-        Class factoryCls = Class.forName(jobProxyFactory);
-        JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance();
+        JobProxyFactory factory = 
ClassLoaderHelper.<JobProxyFactory>fromClassName(jobProxyFactory);
         return factory.getJobProxy(config);
       } catch (Exception e) {
         throw new SamzaException(e);
@@ -120,8 +120,7 @@ public abstract class AbstractJobProxy implements JobProxy {
     }
 
     try {
-      Class factoryCls = Class.forName(configFactoryClassName);
-      return (ConfigFactory) factoryCls.newInstance();
+      return 
ClassLoaderHelper.<ConfigFactory>fromClassName(configFactoryClassName);
     } catch (Exception e) {
       throw new SamzaException(e);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index c47e8d1..313de94 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -40,6 +40,7 @@ import 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsValidator;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
@@ -175,7 +176,7 @@ public class YarnJobValidationTool {
     MetricsValidator validator = null;
     if (options.has(validatorOpt)) {
       String validatorClass = options.valueOf(validatorOpt);
-      validator = (MetricsValidator) 
Class.forName(validatorClass).newInstance();
+      validator = 
ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass);
     }
 
     YarnConfiguration hadoopConfig = new YarnConfiguration();

Reply via email to