Repository: samza
Updated Branches:
  refs/heads/0.11.0 7e929a5da -> a992571bb


Revert "SAMZA-1005 - Refactor class instantiation code to a helper class"

This reverts commit 7b07df321b4312c13acc18bc46c765e44f794064.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a992571b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a992571b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a992571b

Branch: refs/heads/0.11.0
Commit: a992571bb44efd65e71e54f5987bb16875a1f7b4
Parents: 7e929a5
Author: Xinyu Liu <[email protected]>
Authored: Fri Oct 7 15:31:55 2016 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Fri Oct 7 15:31:55 2016 -0700

----------------------------------------------------------------------
 .../clustermanager/ContainerProcessManager.java |  3 +-
 .../apache/samza/util/ClassLoaderHelper.java    | 29 --------------------
 .../apache/samza/container/SamzaContainer.scala |  5 ++--
 .../scala/org/apache/samza/job/JobRunner.scala  |  3 +-
 .../org/apache/samza/util/CommandLine.scala     |  2 +-
 .../org/apache/samza/monitor/MonitorLoader.java |  3 +-
 .../apache/samza/rest/SamzaRestApplication.java |  4 +--
 .../samza/rest/proxy/job/AbstractJobProxy.java  |  7 +++--
 .../samza/validation/YarnJobValidationTool.java |  3 +-
 9 files changed, 13 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b4309d9..2e8cb84 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -26,7 +26,6 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;
@@ -403,7 +402,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     final ResourceManagerFactory factory;
 
     try {
-      factory = 
ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass);
+      factory = (ResourceManagerFactory) 
Class.forName(containerManagerFactoryClass).newInstance();
     } catch (InstantiationException e) {
       log.error("Instantiation exception when creating ContainerManager", e);
       throw new SamzaException(e);

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java 
b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
deleted file mode 100644
index f2b389b..0000000
--- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-public class ClassLoaderHelper {
-
-  public static <T> T fromClassName(String className) throws 
ClassNotFoundException, InstantiationException, IllegalAccessException {
-    Class<T> clazz = (Class<T>) Class.forName(className);
-    T instance = clazz.newInstance();
-    return instance;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 05a996c..4ab4bce 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -74,7 +74,6 @@ import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.AsyncStreamTaskAdapter
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThrottlingExecutor
@@ -442,11 +441,11 @@ object SamzaContainer extends Logging {
 
       val taskName = taskModel.getTaskName
 
-      val taskObj = ClassLoaderHelper.fromClassName[StreamTask](taskClassName)
+      val taskObj = Class.forName(taskClassName).newInstance
 
       val task = if (!singleThreadMode && !isAsyncTask)
         // Wrap the StreamTask into a AsyncStreamTask with the build-in thread 
pool
-        new AsyncStreamTaskAdapter(taskObj, taskThreadPool)
+        new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], 
taskThreadPool)
       else
         taskObj
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 022b480..383bb13 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -89,7 +88,7 @@ class JobRunner(config: Config) extends Logging {
       case Some(factoryClass) => factoryClass
       case _ => throw new SamzaException("no job factory class defined")
     }
-    val jobFactory = 
ClassLoaderHelper.fromClassName[StreamJobFactory](jobFactoryClass)
+    val jobFactory = 
Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
     info("job factory: %s" format (jobFactoryClass))
     val factory = new CoordinatorStreamSystemFactory
     val coordinatorSystemConsumer = 
factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index 6962b22..f26501b 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -63,7 +63,7 @@ class CommandLine {
     // Set up the job parameters.
     val configFactoryClass = options.valueOf(configFactoryOpt)
     val configPaths = options.valuesOf(configPathOpt)
-    configFactory = 
ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass)
+    configFactory = 
Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory]
     val configOverrides = options.valuesOf(configOverrideOpt).map(kv => 
(kv.key, kv.value)).toMap
 
     val configs: Buffer[java.util.Map[String, String]] = 
configPaths.map(configFactory.getConfig)

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
index dcf9e57..9610a14 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
@@ -19,7 +19,6 @@
 package org.apache.samza.monitor;
 
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.util.ClassLoaderHelper;
 
 
 public class MonitorLoader {
@@ -35,7 +34,7 @@ public class MonitorLoader {
       throws InstantiationException {
       String factoryClass = monitorConfig.getMonitorFactoryClass();
       try {
-        MonitorFactory monitorFactory = 
ClassLoaderHelper.fromClassName(factoryClass);
+        MonitorFactory monitorFactory = (MonitorFactory) 
Class.forName(factoryClass).newInstance();
         return monitorFactory.getMonitorInstance(monitorConfig, 
metricsRegistry);
       } catch (Exception e) {
         throw (InstantiationException)

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
index a6e0bb0..61f3c46 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
@@ -24,7 +24,6 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;
 import org.apache.samza.rest.resources.ResourceFactory;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.slf4j.Logger;
@@ -84,7 +83,8 @@ public class SamzaRestApplication extends ResourceConfig {
   private Collection<? extends Object> instantiateFactoryResources(String 
factoryClassName, Config config)
       throws InstantiationException {
     try {
-      ResourceFactory factory = 
ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName);
+      Class factoryCls = Class.forName(factoryClassName);
+      ResourceFactory factory = (ResourceFactory) factoryCls.newInstance();
       return factory.getResourceInstances(config);
     } catch (Exception e) {
       throw (InstantiationException)

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
----------------------------------------------------------------------
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
index 4d8647f..bcc88d0 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
@@ -29,7 +29,6 @@ import 
org.apache.samza.config.factories.PropertiesConfigFactory;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +52,8 @@ public abstract class AbstractJobProxy implements JobProxy {
     String jobProxyFactory = config.getJobProxyFactory();
     if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
       try {
-        JobProxyFactory factory = 
ClassLoaderHelper.<JobProxyFactory>fromClassName(jobProxyFactory);
+        Class factoryCls = Class.forName(jobProxyFactory);
+        JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance();
         return factory.getJobProxy(config);
       } catch (Exception e) {
         throw new SamzaException(e);
@@ -120,7 +120,8 @@ public abstract class AbstractJobProxy implements JobProxy {
     }
 
     try {
-      return 
ClassLoaderHelper.<ConfigFactory>fromClassName(configFactoryClassName);
+      Class factoryCls = Class.forName(configFactoryClassName);
+      return (ConfigFactory) factoryCls.newInstance();
     } catch (Exception e) {
       throw new SamzaException(e);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 313de94..c47e8d1 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -40,7 +40,6 @@ import 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsValidator;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
@@ -176,7 +175,7 @@ public class YarnJobValidationTool {
     MetricsValidator validator = null;
     if (options.has(validatorOpt)) {
       String validatorClass = options.valueOf(validatorOpt);
-      validator = 
ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass);
+      validator = (MetricsValidator) 
Class.forName(validatorClass).newInstance();
     }
 
     YarnConfiguration hadoopConfig = new YarnConfiguration();

Reply via email to