Repository: samza
Updated Branches:
  refs/heads/master fbac87d8a -> 3dcd2e9ec


SAMZA-999 - Checkpoint-tool needs to use the rewriter to rewrite the configs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3dcd2e9e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3dcd2e9e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3dcd2e9e

Branch: refs/heads/master
Commit: 3dcd2e9ec8d0e91049aafbcabd23c716ecf82135
Parents: fbac87d
Author: Boris Shkolik <[email protected]>
Authored: Tue Aug 16 12:03:04 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Tue Aug 16 12:03:04 2016 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/CheckpointTool.scala       | 23 ++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3dcd2e9e/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 91de18d..449b402 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -24,9 +24,10 @@ import java.util.regex.Pattern
 import joptsimple.OptionSet
 import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.{Config, StreamConfig}
+import org.apache.samza.config.{JobConfig, ConfigRewriter, Config, 
StreamConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.job.JobRunner._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{Util, CommandLine, Logging}
@@ -126,11 +127,29 @@ object CheckpointTool {
     new CheckpointTool(config, offsets, manager)
   }
 
+  def rewriteConfig(config: JobConfig): Config = {
+    def rewrite(c: JobConfig, rewriterName: String): Config = {
+      val klass = config
+              .getConfigRewriterClass(rewriterName)
+              .getOrElse(throw new SamzaException("Unable to find class config 
for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config for CheckpointTool with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, 
_))
+      case _ => config
+    }
+  }
+
   def main(args: Array[String]) {
     val cmdline = new CheckpointToolCommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    val tool = CheckpointTool(config, cmdline.newOffsets)
+    val rconfig = rewriteConfig(new JobConfig(config))
+    print("Rewritten config" + rconfig)
+    val tool = CheckpointTool(rconfig, cmdline.newOffsets)
     tool.run
   }
 }

Reply via email to