Repository: samza Updated Branches: refs/heads/master 8c7e2eb04 -> bc7a07a1a
SAMZA-793 static config rewriter should be invoked in JobRunner Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bc7a07a1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bc7a07a1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bc7a07a1 Branch: refs/heads/master Commit: bc7a07a1a0b8d610db2aec47d8c8363263478274 Parents: 8c7e2eb Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Sun Oct 11 22:27:43 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Tue Nov 17 10:19:23 2015 -0800 ---------------------------------------------------------------------- .../samza/coordinator/JobCoordinator.scala | 29 ++---------------- .../scala/org/apache/samza/job/JobRunner.scala | 31 +++++++++++++++++--- 2 files changed, 30 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index ef40c35..112ec1c 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -21,8 +21,8 @@ package org.apache.samza.coordinator import org.apache.samza.config.StorageConfig -import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} -import org.apache.samza.config.{Config, ConfigRewriter} +import org.apache.samza.job.model.{JobModel, TaskModel} +import org.apache.samza.config.Config import org.apache.samza.SamzaException import org.apache.samza.container.grouper.task.TaskNameGrouperFactory import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory @@ -90,7 +90,7 @@ object JobCoordinator extends Logging { val streamMetadataCache = new StreamMetadataCache(systemAdmins) - val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache) + val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache) jobCoordinator @@ -141,29 +141,6 @@ object JobCoordinator extends Logging { } /** - * Re-writes configuration using a ConfigRewriter, if one is defined. If - * there is no ConfigRewriter defined for the job, then this method is a - * no-op. - * - * @param config The config to re-write. - */ - def rewriteConfig(config: Config): Config = { - def rewrite(c: Config, rewriterName: String): Config = { - val klass = config - .getConfigRewriterClass(rewriterName) - .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) - val rewriter = Util.getObj[ConfigRewriter](klass) - info("Re-writing config with " + rewriter) - rewriter.rewrite(rewriterName, c) - } - - config.getConfigRewriters match { - case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _)) - case _ => config - } - } - - /** * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels * which catchup with the latest content from the coordinator stream. */ http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index d6109ec..a3613ff 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -20,7 +20,7 @@ package org.apache.samza.job import org.apache.samza.SamzaException -import org.apache.samza.config.Config +import org.apache.samza.config.{ConfigRewriter, Config} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} import org.apache.samza.job.ApplicationStatus.Running @@ -30,17 +30,40 @@ import org.apache.samza.util.Logging import org.apache.samza.util.Util import scala.collection.JavaConversions._ import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory} +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory -object JobRunner { +object JobRunner extends Logging { val SOURCE = "job-runner" + /** + * Re-writes configuration using a ConfigRewriter, if one is defined. If + * there is no ConfigRewriter defined for the job, then this method is a + * no-op. + * + * @param config The config to re-write. + */ + def rewriteConfig(config: Config): Config = { + def rewrite(c: Config, rewriterName: String): Config = { + val klass = config + .getConfigRewriterClass(rewriterName) + .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) + val rewriter = Util.getObj[ConfigRewriter](klass) + info("Re-writing config with " + rewriter) + rewriter.rewrite(rewriterName, c) + } + + config.getConfigRewriters match { + case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _)) + case _ => config + } + } + def main(args: Array[String]) { val cmdline = new CommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) - new JobRunner(config).run() + new JobRunner(rewriteConfig(config)).run() } }
