Repository: samza Updated Branches: refs/heads/master f493f5b9e -> 7b07df321
SAMZA-1005 - Refactor class instantiation code to a helper class Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b07df32 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b07df32 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b07df32 Branch: refs/heads/master Commit: 7b07df321b4312c13acc18bc46c765e44f794064 Parents: f493f5b Author: Branislav Cogic <[email protected]> Authored: Sat Sep 10 16:43:30 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Sat Sep 10 16:43:30 2016 -0700 ---------------------------------------------------------------------- .../clustermanager/ContainerProcessManager.java | 3 +- .../apache/samza/util/ClassLoaderHelper.java | 29 ++++++++++++++++++++ .../apache/samza/container/SamzaContainer.scala | 5 ++-- .../scala/org/apache/samza/job/JobRunner.scala | 3 +- .../org/apache/samza/util/CommandLine.scala | 2 +- .../org/apache/samza/monitor/MonitorLoader.java | 6 ++-- .../apache/samza/rest/SamzaRestApplication.java | 4 +-- .../samza/rest/proxy/job/AbstractJobProxy.java | 7 ++--- .../samza/validation/YarnJobValidationTool.java | 3 +- 9 files changed, 47 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index c6bfec0..1fed2fb 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -26,6 +26,7 @@ import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.metrics.ContainerProcessManagerMetrics; import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.util.ClassLoaderHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; @@ -402,7 +403,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback final ResourceManagerFactory factory; try { - factory = (ResourceManagerFactory) Class.forName(containerManagerFactoryClass).newInstance(); + factory = ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass); } catch (InstantiationException e) { log.error("Instantiation exception when creating ContainerManager", e); throw new SamzaException(e); http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java new file mode 100644 index 0000000..f2b389b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +public class ClassLoaderHelper { + + public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException { + Class<T> clazz = (Class<T>) Class.forName(className); + T instance = clazz.newInstance(); + return instance; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 4ab4bce..05a996c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -74,6 +74,7 @@ import org.apache.samza.task.AsyncStreamTask import org.apache.samza.task.AsyncStreamTaskAdapter import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.util.ClassLoaderHelper import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.ThrottlingExecutor @@ -441,11 +442,11 @@ object SamzaContainer extends Logging { val taskName = taskModel.getTaskName - val taskObj = Class.forName(taskClassName).newInstance + val taskObj = ClassLoaderHelper.fromClassName[StreamTask](taskClassName) val task = if (!singleThreadMode && !isAsyncTask) // Wrap the StreamTask into a AsyncStreamTask with the build-in thread pool - new AsyncStreamTaskAdapter(taskObj.asInstanceOf[StreamTask], taskThreadPool) + new AsyncStreamTaskAdapter(taskObj, taskThreadPool) else taskObj http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 383bb13..022b480 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -24,6 +24,7 @@ import org.apache.samza.config.{ConfigRewriter, Config} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} import org.apache.samza.job.ApplicationStatus.Running +import org.apache.samza.util.ClassLoaderHelper import org.apache.samza.util.CommandLine import org.apache.samza.util.Logging import org.apache.samza.util.Util @@ -88,7 +89,7 @@ class JobRunner(config: Config) extends Logging { case Some(factoryClass) => factoryClass case _ => throw new SamzaException("no job factory class defined") } - val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory] + val jobFactory = ClassLoaderHelper.fromClassName[StreamJobFactory](jobFactoryClass) info("job factory: %s" format (jobFactoryClass)) val factory = new CoordinatorStreamSystemFactory val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala index f26501b..6962b22 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala @@ -63,7 +63,7 @@ class CommandLine { // Set up the job parameters. val configFactoryClass = options.valueOf(configFactoryOpt) val configPaths = options.valuesOf(configPathOpt) - configFactory = Class.forName(configFactoryClass).newInstance.asInstanceOf[ConfigFactory] + configFactory = ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass) val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig) http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java index 75f3867..502ecc4 100644 --- a/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java +++ b/samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java @@ -18,6 +18,8 @@ */ package org.apache.samza.monitor; +import org.apache.samza.util.ClassLoaderHelper; + import java.lang.reflect.Constructor; class MonitorLoader { @@ -28,9 +30,7 @@ class MonitorLoader { throws InstantiationException { Object monitorObject; try { - Class<?> klass = Class.forName(monitorClassName); - Constructor<?> constructor = klass.getConstructor(); - monitorObject = constructor.newInstance(); + monitorObject = ClassLoaderHelper.fromClassName(monitorClassName); } catch (Exception e) { throw (InstantiationException) new InstantiationException("Unable to instantiate " + monitorClassName).initCause(e); http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java index 61f3c46..a6e0bb0 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java @@ -24,6 +24,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.rest.resources.DefaultResourceFactory; import org.apache.samza.rest.resources.ResourceFactory; +import org.apache.samza.util.ClassLoaderHelper; import org.codehaus.jackson.jaxrs.JacksonJsonProvider; import org.glassfish.jersey.server.ResourceConfig; import org.slf4j.Logger; @@ -83,8 +84,7 @@ public class SamzaRestApplication extends ResourceConfig { private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config) throws InstantiationException { try { - Class factoryCls = Class.forName(factoryClassName); - ResourceFactory factory = (ResourceFactory) factoryCls.newInstance(); + ResourceFactory factory = ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName); return factory.getResourceInstances(config); } catch (Exception e) { throw (InstantiationException) http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java index bcc88d0..4d8647f 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java @@ -29,6 +29,7 @@ import org.apache.samza.config.factories.PropertiesConfigFactory; import org.apache.samza.rest.model.Job; import org.apache.samza.rest.model.JobStatus; import org.apache.samza.rest.resources.JobsResourceConfig; +import org.apache.samza.util.ClassLoaderHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +53,7 @@ public abstract class AbstractJobProxy implements JobProxy { String jobProxyFactory = config.getJobProxyFactory(); if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) { try { - Class factoryCls = Class.forName(jobProxyFactory); - JobProxyFactory factory = (JobProxyFactory) factoryCls.newInstance(); + JobProxyFactory factory = ClassLoaderHelper.<JobProxyFactory>fromClassName(jobProxyFactory); return factory.getJobProxy(config); } catch (Exception e) { throw new SamzaException(e); @@ -120,8 +120,7 @@ public abstract class AbstractJobProxy implements JobProxy { } try { - Class factoryCls = Class.forName(configFactoryClassName); - return (ConfigFactory) factoryCls.newInstance(); + return ClassLoaderHelper.<ConfigFactory>fromClassName(configFactoryClassName); } catch (Exception e) { throw new SamzaException(e); } http://git-wip-us.apache.org/repos/asf/samza/blob/7b07df32/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index c47e8d1..313de94 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -40,6 +40,7 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; import org.apache.samza.metrics.MetricsValidator; +import org.apache.samza.util.ClassLoaderHelper; import org.apache.samza.util.hadoop.HttpFileSystem; import org.apache.samza.util.CommandLine; import org.slf4j.Logger; @@ -175,7 +176,7 @@ public class YarnJobValidationTool { MetricsValidator validator = null; if (options.has(validatorOpt)) { String validatorClass = options.valueOf(validatorOpt); - validator = (MetricsValidator) Class.forName(validatorClass).newInstance(); + validator = ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass); } YarnConfiguration hadoopConfig = new YarnConfiguration();
