Repository: samza Updated Branches: refs/heads/0.11.0 7e929a5da -> a992571bb
Revert "SAMZA-1005 - Refactor class instantiation code to a helper class" This reverts commit 7b07df321b4312c13acc18bc46c765e44f794064. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a992571b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a992571b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a992571b Branch: refs/heads/0.11.0 Commit: a992571bb44efd65e71e54f5987bb16875a1f7b4 Parents: 7e929a5 Author: Xinyu Liu <[email protected]> Authored: Fri Oct 7 15:31:55 2016 -0700 Committer: Xinyu Liu <[email protected]> Committed: Fri Oct 7 15:31:55 2016 -0700 ---------------------------------------------------------------------- .../clustermanager/ContainerProcessManager.java | 3 +- .../apache/samza/util/ClassLoaderHelper.java | 29 -------------------- .../apache/samza/container/SamzaContainer.scala | 5 ++-- .../scala/org/apache/samza/job/JobRunner.scala | 3 +- .../org/apache/samza/util/CommandLine.scala | 2 +- .../org/apache/samza/monitor/MonitorLoader.java | 3 +- .../apache/samza/rest/SamzaRestApplication.java | 4 +-- .../samza/rest/proxy/job/AbstractJobProxy.java | 7 +++-- .../samza/validation/YarnJobValidationTool.java | 3 +- 9 files changed, 13 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index b4309d9..2e8cb84 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -26,7 +26,6 @@ import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.metrics.ContainerProcessManagerMetrics; import org.apache.samza.metrics.MetricsRegistryMap; -import org.apache.samza.util.ClassLoaderHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; @@ -403,7 +402,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback final ResourceManagerFactory factory; try { - factory = ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass); + factory = (ResourceManagerFactory) Class.forName(containerManagerFactoryClass).newInstance(); } catch (InstantiationException e) { log.error("Instantiation exception when creating ContainerManager", e); throw new SamzaException(e); http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java deleted file mode 100644 index f2b389b..0000000 --- a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.util; - -public class ClassLoaderHelper { - - public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException { - Class<T> clazz = (Class<T>) Class.forName(className); - T instance = clazz.newInstance(); - return instance; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 05a996c..4ab4bce 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -74,7 +74,6 @@ import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.AsyncStreamTaskAdapter import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.util.ClassLoaderHelper import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.ThrottlingExecutor @@ -442,11 +441,11 @@ object SamzaContainer extends Logging { val taskName = taskModel.getTaskName - val taskObj = ClassLoaderHelper.fromClassName[StreamTask](taskClassName) + val taskObj = Class.forName(taskClassName).newInstance val task = if (!singleThreadMode && !isAsyncTask) // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool - new AsyncStreamTaskAdapter(taskObj, taskThreadPool) + new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool) else taskObj http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 022b480..383bb13 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -24,7 +24,6 @@ import org.apache.samza.config.{ConfigRewriter, Config} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} import org.apache.samza.job.ApplicationStatus.Running -import org.apache.samza.util.ClassLoaderHelper import org.apache.samza.util.CommandLine import org.apache.samza.util.Logging import org.apache.samza.util.Util @@ -89,7 +88,7 @@ class JobRunner(config: Config) extends Logging { case Some(factoryClass) => factoryClass case _ => throw new SamzaException("no job factory class defined") } - val jobFactory = ClassLoaderHelper.fromClassName[StreamJobFactory](jobFactoryClass) + val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory] info("job factory: %s" format (jobFactoryClass)) val factory = new CoordinatorStreamSystemFactory val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala index 6962b22..f26501b 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala @@ -63,7 +63,7 @@ class CommandLine { // Set up the job parameters. val configFactoryClass = options.valueOf(configFactoryOpt) val configPaths = options.valuesOf(configPathOpt) - configFactory = ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass) + configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory] val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig) http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java index dcf9e57..9610a14 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java @@ -19,7 +19,6 @@ package org.apache.samza.monitor; import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.util.ClassLoaderHelper; public class MonitorLoader { @@ -35,7 +34,7 @@ public class MonitorLoader { throws InstantiationException { String factoryClass = monitorConfig.getMonitorFactoryClass(); try { - MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass); + MonitorFactory monitorFactory = (MonitorFactory) Class.forName(factoryClass).newInstance(); return monitorFactory.getMonitorInstance(monitorConfig, metricsRegistry); } catch (Exception e) { throw (InstantiationException) http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java index a6e0bb0..61f3c46 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java @@ -24,7 +24,6 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.rest.resources.DefaultResourceFactory; import org.apache.samza.rest.resources.ResourceFactory; -import org.apache.samza.util.ClassLoaderHelper; import org.codehaus.jackson.jaxrs.JacksonJsonProvider; import org.glassfish.jersey.server.ResourceConfig; import org.slf4j.Logger; @@ -84,7 +83,8 @@ public class SamzaRestApplication extends ResourceConfig { private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config) throws InstantiationException { try { - ResourceFactory factory = ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName); + Class factoryCls = Class.forName(factoryClassName); + ResourceFactory factory = (ResourceFactory) factoryCls.newInstance(); return factory.getResourceInstances(config); } catch (Exception e) { throw (InstantiationException) http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java index 4d8647f..bcc88d0 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java @@ -29,7 +29,6 @@ import org.apache.samza.config.factories.PropertiesConfigFactory; import org.apache.samza.rest.model.Job; import org.apache.samza.rest.model.JobStatus; import org.apache.samza.rest.resources.JobsResourceConfig; -import org.apache.samza.util.ClassLoaderHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +52,8 @@ public abstract class AbstractJobProxy implements JobProxy { String jobProxyFactory = config.getJobProxyFactory(); if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) { try { - JobProxyFactory factory = ClassLoaderHelper.<JobProxyFactory>fromClassName(jobProxyFactory); + Class factoryCls = Class.forName(jobProxyFactory); + JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance(); return factory.getJobProxy(config); } catch (Exception e) { throw new SamzaException(e); @@ -120,7 +120,8 @@ public abstract class AbstractJobProxy implements JobProxy { } try { - return ClassLoaderHelper.<ConfigFactory>fromClassName(configFactoryClassName); + Class factoryCls = Class.forName(configFactoryClassName); + return (ConfigFactory) factoryCls.newInstance(); } catch (Exception e) { throw new SamzaException(e); } http://git-wip-us.apache.org/repos/asf/samza/blob/a992571b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 313de94..c47e8d1 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -40,7 +40,6 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; import org.apache.samza.metrics.MetricsValidator; -import org.apache.samza.util.ClassLoaderHelper; import org.apache.samza.util.hadoop.HttpFileSystem; import org.apache.samza.util.CommandLine; import org.slf4j.Logger; @@ -176,7 +175,7 @@ public class YarnJobValidationTool { MetricsValidator validator = null; if (options.has(validatorOpt)) { String validatorClass = options.valueOf(validatorOpt); - validator = ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass); + validator = (MetricsValidator) Class.forName(validatorClass).newInstance(); } YarnConfiguration hadoopConfig = new YarnConfiguration();
