Repository: samza Updated Branches: refs/heads/master fbac87d8a -> 3dcd2e9ec
SAMZA-999 - Checkpoint-tool needs to use the rewriter to rewrite the configs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3dcd2e9e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3dcd2e9e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3dcd2e9e Branch: refs/heads/master Commit: 3dcd2e9ec8d0e91049aafbcabd23c716ecf82135 Parents: fbac87d Author: Boris Shkolik <[email protected]> Authored: Tue Aug 16 12:03:04 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Tue Aug 16 12:03:04 2016 -0700 ---------------------------------------------------------------------- .../samza/checkpoint/CheckpointTool.scala | 23 ++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3dcd2e9e/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 91de18d..449b402 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -24,9 +24,10 @@ import java.util.regex.Pattern import joptsimple.OptionSet import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.{Config, StreamConfig} +import org.apache.samza.config.{JobConfig, ConfigRewriter, Config, StreamConfig} import org.apache.samza.container.TaskName import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory +import org.apache.samza.job.JobRunner._ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{Util, CommandLine, Logging} @@ -126,11 +127,29 @@ object CheckpointTool { new CheckpointTool(config, offsets, manager) } + def rewriteConfig(config: JobConfig): Config = { + def rewrite(c: JobConfig, rewriterName: String): Config = { + val klass = config + .getConfigRewriterClass(rewriterName) + .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName)) + val rewriter = Util.getObj[ConfigRewriter](klass) + info("Re-writing config for CheckpointTool with " + rewriter) + rewriter.rewrite(rewriterName, c) + } + + config.getConfigRewriters match { + case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _)) + case _ => config + } + } + def main(args: Array[String]) { val cmdline = new CheckpointToolCommandLine val options = cmdline.parser.parse(args: _*) val config = cmdline.loadConfig(options) - val tool = CheckpointTool(config, cmdline.newOffsets) + val rconfig = rewriteConfig(new JobConfig(config)) + print("Rewritten config" + rconfig) + val tool = CheckpointTool(rconfig, cmdline.newOffsets) tool.run } }
