This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch 1.4.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.4.0 by this push:
     new b6be178  Revert "SAMZA-2420: Update CommandLine to use config loader 
for local config file (#1256)" (#1270)
b6be178 is described below

commit b6be1788f0cf7208296c97c810f193a1200a49fd
Author: Cameron Lee <[email protected]>
AuthorDate: Fri Feb 7 16:07:39 2020 -0800

    Revert "SAMZA-2420: Update CommandLine to use config loader for local 
config file (#1256)" (#1270)
    
    This reverts commit 1a03a6a9b8ed19129305842744d6619277bb72f6.
    
    Reverting this change for the 1.4.0 branch since it is backwards 
incompatible and we don't want to release this for 1.4.
---
 bin/setup-int-test.sh                              |  2 +-
 .../java/org/apache/samza/config/MapConfig.java    |  1 -
 .../stream/CoordinatorStreamWriter.java            |  2 +-
 .../org/apache/samza/storage/StateStorageTool.java |  9 ++--
 .../org/apache/samza/storage/StorageRecovery.java  | 26 +++++-----
 .../apache/samza/checkpoint/CheckpointTool.scala   | 21 +++-----
 .../CoordinatorStreamWriterCommandLine.scala       | 12 ++---
 .../scala/org/apache/samza/util/CommandLine.scala  | 57 +++++++++++-----------
 .../samza/runtime/TestApplicationRunnerMain.java   | 24 ++++-----
 .../samza/checkpoint/TestCheckpointTool.scala      | 30 +++++++-----
 .../scala/org/apache/samza/job/TestJobRunner.scala | 24 ++++-----
 .../samza/storage/kv/RocksDbReadingTool.java       | 10 ++--
 .../org/apache/samza/rest/SamzaRestService.java    |  3 +-
 samza-test/src/main/config/join/checker.samza      | 27 ----------
 .../join/{watcher.samza => common.properties}      | 11 -----
 samza-test/src/main/config/join/emitter.samza      | 27 ----------
 samza-test/src/main/config/join/joiner.samza       | 27 ----------
 samza-test/src/main/config/join/watcher.samza      | 27 ----------
 .../integration/LocalApplicationRunnerMain.java    |  4 +-
 19 files changed, 115 insertions(+), 229 deletions(-)

diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
index b33ff27..112bda6 100755
--- a/bin/setup-int-test.sh
+++ b/bin/setup-int-test.sh
@@ -43,7 +43,7 @@ $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 
--partitions 1 --repli
 # Start the jobs
 for job in checker joiner emitter watcher
 do
-    $SAMZA_DIR/bin/run-job.sh 
--config-loader-factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
 --config-loader-properties path=$SAMZA_DIR/config/join/$job.samza --config 
job.foo=$job
+    $SAMZA_DIR/bin/run-job.sh 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=file://$SAMZA_DIR/config/join/common.properties 
--config-path=file://$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
 done
 
 
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java 
b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 213d453..5af2535 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -50,7 +50,6 @@ public class MapConfig extends Config {
     }
   }
 
-  @SafeVarargs
   public MapConfig(Map<String, String>... maps) {
     this(Arrays.asList(maps));
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index 903f99a..2e857f4 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -103,7 +103,7 @@ public class CoordinatorStreamWriter {
    * Main function for using the CoordinatorStreamWriter. The main function 
starts a CoordinatorStreamWriter
    * and sends control messages.
    * To run the code use the following command:
-   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  
--config-loader-factory={config--loader-factory} 
--config-loader-properties={properties needed for config loader to load config} 
--type={type of the message} --key={[optional] key of the message} 
--value={[optional] value of the message}
+   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  
--config-factory={config-factory} --config-path={path to config file of a job} 
--type={type of the message} --key={[optional] key of the message} 
--value={[optional] value of the message}
    *
    * @param args input arguments for running the writer. These arguments are:
    *             "config-factory" = The config file factory
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java 
b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
index 6518929..beba35c 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
@@ -21,7 +21,8 @@ package org.apache.samza.storage;
 
 import joptsimple.ArgumentAcceptingOptionSpec;
 import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
+
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +36,8 @@ public class StateStorageTool extends CommandLine {
   private Logger log = LoggerFactory.getLogger(StateStorageTool.class);
 
   @Override
-  public Config loadConfig(OptionSet options) {
-    Config config = super.loadConfig(options);
+  public MapConfig loadConfig(OptionSet options) {
+    MapConfig config = super.loadConfig(options);
     if (options.has(newPathArgu)) {
       newPath = options.valueOf(newPathArgu);
       log.info("new state storage is " + newPath);
@@ -51,7 +52,7 @@ public class StateStorageTool extends CommandLine {
   public static void main(String[] args) {
     StateStorageTool tool = new StateStorageTool();
     OptionSet options = tool.parser().parse(args);
-    Config config = tool.loadConfig(options);
+    MapConfig config = tool.loadConfig(options);
     String path = tool.getPath();
 
     StorageRecovery storageRecovery = new StorageRecovery(config, path);
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 5d34176..11237a8 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -53,6 +53,7 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.StreamUtil;
@@ -62,11 +63,11 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Recovers the state storage from the changelog streams and stores the state
+ * Recovers the state storages from the changelog streams and store the 
storages
  * in the directory provided by the users. The changelog streams are derived
  * from the job's config file.
  */
-public class StorageRecovery {
+public class StorageRecovery extends CommandLine {
   private static final Logger LOG = 
LoggerFactory.getLogger(StorageRecovery.class);
 
   private final Config jobConfig;
@@ -107,7 +108,7 @@ public class StorageRecovery {
   }
 
   /**
-   * run the setup phase and restore all the task storage
+   * run the setup phase and restore all the task storages
    */
   public void run() {
     setup();
@@ -125,7 +126,9 @@ public class StorageRecovery {
               + " Proceeding with the next container", containerName);
         }
       });
-    this.containerStorageManagers.forEach((containerName, 
containerStorageManager) -> containerStorageManager.shutdown());
+    this.containerStorageManagers.forEach((containerName, 
containerStorageManager) -> {
+        containerStorageManager.shutdown();
+      });
     systemAdmins.stop();
 
     LOG.info("successfully recovered in " + storeBaseDir.toString());
@@ -166,15 +169,13 @@ public class StorageRecovery {
 
       LOG.info("stream name for " + storeName + " is " + 
streamName.orElse(null));
 
-      streamName.ifPresent(name -> changeLogSystemStreams.put(storeName, 
StreamUtil.getSystemStreamFromNames(name)));
+      if (streamName.isPresent()) {
+        changeLogSystemStreams.put(storeName, 
StreamUtil.getSystemStreamFromNames(streamName.get()));
+      }
 
       Optional<String> factoryClass = 
config.getStorageFactoryClassName(storeName);
       if (factoryClass.isPresent()) {
-        @SuppressWarnings("unchecked")
-        StorageEngineFactory<Object, Object> factory =
-            (StorageEngineFactory<Object, Object>) 
ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class);
-
-        storageEngineFactories.put(storeName, factory);
+        storageEngineFactories.put(storeName, 
ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class));
       } else {
         throw new SamzaException("Missing storage factory for " + storeName + 
".");
       }
@@ -203,8 +204,7 @@ public class StorageRecovery {
         .forEach(serdeName -> {
             String serdeClassName = 
serializerConfig.getSerdeFactoryClass(serdeName)
               .orElseGet(() -> 
SerializerConfig.getPredefinedSerdeFactoryName(serdeName));
-            @SuppressWarnings("unchecked")
-            Serde<Object> serde =
+            Serde serde =
                 ReflectionUtil.getObj(serdeClassName, 
SerdeFactory.class).getSerde(serdeName, serializerConfig);
             serdeMap.put(serdeName, serde);
           });
@@ -216,7 +216,7 @@ public class StorageRecovery {
    * create one TaskStorageManager for each task. Add all of them to the
    * List<TaskStorageManager>
    */
-  @SuppressWarnings("rawtypes")
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private void getContainerStorageManagers() {
     Clock clock = SystemClock.instance();
     StreamMetadataCache streamMetadataCache = new 
StreamMetadataCache(systemAdmins, 5000, clock);
diff --git 
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala 
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 4db807e..eff1e73 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -19,10 +19,9 @@
 
 package org.apache.samza.checkpoint
 
-import java.io.FileInputStream
 import java.net.URI
 import java.util
-import java.util.Properties
+import java.util.function.Supplier
 import java.util.regex.Pattern
 
 import joptsimple.ArgumentAcceptingOptionSpec
@@ -33,7 +32,7 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.job.JobRunner.info
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, 
Logging}
+import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil, 
Logging, ReflectionUtil, Util}
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 
@@ -92,7 +91,7 @@ object CheckpointTool {
 
     var newOffsets: TaskNameToCheckpointMap = _
 
-    def parseOffsets(propertiesFile: Properties): TaskNameToCheckpointMap = {
+    def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
       var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition, 
String])] = ListBuffer()
       propertiesFile.asScala.foreach { case (key, value) =>
         val matcher = SSP_REGEX.matcher(key)
@@ -118,15 +117,10 @@ object CheckpointTool {
         .mapValues(m => m.reduce( _ ++ _))  // Merge all the maps of 
SSPs->Offset into one for the whole taskname
     }
 
-    override def loadConfig(options: OptionSet): Config = {
+    override def loadConfig(options: OptionSet): MapConfig = {
       val config = super.loadConfig(options)
       if (options.has(newOffsetsOpt)) {
-        val newOffsetsInputStream = new 
FileInputStream(options.valueOf(newOffsetsOpt).getPath)
-        val properties = new Properties()
-
-        properties.load(newOffsetsInputStream)
-        newOffsetsInputStream.close()
-
+        val properties = 
configFactory.getConfig(options.valueOf(newOffsetsOpt))
         newOffsets = parseOffsets(properties)
       }
       config
@@ -198,11 +192,12 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap, 
coordinatorStreamStore
 
       if (newOffsets != null) {
         newOffsets.foreach {
-          case (taskName: TaskName, offsets: Map[SystemStreamPartition, 
String]) =>
+          case (taskName: TaskName, offsets: Map[SystemStreamPartition, 
String]) => {
             logCheckpoint(taskName, offsets, "New offset to be written for 
task: " + taskName)
             val checkpoint = new Checkpoint(offsets.asJava)
             checkpointManager.writeCheckpoint(taskName, checkpoint)
             info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+          }
         }
       }
     } finally {
@@ -212,7 +207,7 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap, 
coordinatorStreamStore
   }
 
   def getConfigFromCoordinatorStream(coordinatorStreamStore: 
CoordinatorStreamStore): Config = {
-    
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+    return 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
   }
 
   def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition, 
String], prefix: String) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
 
b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
index dbeeffa..0c17800 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
@@ -20,11 +20,11 @@
 package org.apache.samza.coordinator.stream
 
 import org.apache.samza.util.CommandLine
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
+import joptsimple.OptionSet
 
 class CoordinatorStreamWriterCommandLine extends CommandLine {
 
-  val messageType: ArgumentAcceptingOptionSpec[String] =
+  val messageType =
     parser.accepts("type", "the type of the message being sent.")
         .withRequiredArg
         .ofType(classOf[java.lang.String])
@@ -32,19 +32,19 @@ class CoordinatorStreamWriterCommandLine extends 
CommandLine {
         " The possible values are {\"set-config\"}")
 
 
-  val messageKey: ArgumentAcceptingOptionSpec[String] =
+  val messageKey =
     parser.accepts("key", "the type of the message being sent")
         .withRequiredArg
         .ofType(classOf[java.lang.String])
         .describedAs("key of the message")
 
-  val messageValue: ArgumentAcceptingOptionSpec[String] =
+  val messageValue =
     parser.accepts("value", "the type of the message being sent")
         .withRequiredArg
         .ofType(classOf[java.lang.String])
         .describedAs("value of the message")
 
-  def loadType(options: OptionSet): String = {
+  def loadType(options: OptionSet) = {
     if (!options.has(messageType)) {
       parser.printHelpOn(System.err)
       System.exit(-1)
@@ -60,7 +60,7 @@ class CoordinatorStreamWriterCommandLine extends CommandLine {
     }
   }
 
-  def loadValue(options: OptionSet): String = {
+  def loadValue(options: OptionSet) = {
     var value: java.lang.String = null
     if (options.has(messageValue)) {
       value = options.valueOf(messageValue)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index 5ab5e99..b97afad 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -19,13 +19,13 @@
 
 package org.apache.samza.util
 
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
+import java.net.URI
+import joptsimple.{OptionParser, OptionSet}
 import joptsimple.util.KeyValuePair
-import org.apache.samza.config.{Config, ConfigLoaderFactory, JobConfig, 
MapConfig}
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
-
+import org.apache.samza.config.{ConfigFactory, MapConfig}
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.collection.mutable.Buffer
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 /**
  * Defines a basic set of command-line options for Samza tasks. Tools can use 
this
@@ -33,40 +33,41 @@ import scala.collection.mutable
  */
 class CommandLine {
   val parser = new OptionParser()
-  val configLoaderFactoryOpt: ArgumentAcceptingOptionSpec[String] =
-    parser.accepts("config-loader-factory", "The config loader factory to use 
to read full job config file.")
+  val configFactoryOpt =
+    parser.accepts("config-factory", "The config factory to use to read your 
config file.")
           .withRequiredArg
           .ofType(classOf[java.lang.String])
           .describedAs("com.foo.bar.ClassName")
-          .defaultsTo(classOf[PropertiesConfigLoaderFactory].getName)
-  val configLoaderPropertiesOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
-    parser.accepts("config-loader-properties", "A config loader property in 
the form key=value. Config loader properties will be passed to " +
-                                               "designated config loader 
factory to load full job config.")
+          .defaultsTo(classOf[PropertiesConfigFactory].getName)
+  val configPathOpt =
+    parser.accepts("config-path", "URI location to a config file (e.g. 
file:///some/local/path.properties). " +
+                                  "If multiple files are given they are all 
used with later files overriding any values that appear in earlier files.")
           .withRequiredArg
-          .ofType(classOf[KeyValuePair])
-          .describedAs("key=value")
-  val configOverrideOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
+          .ofType(classOf[URI])
+          .describedAs("path")
+  val configOverrideOpt =
     parser.accepts("config", "A configuration value in the form key=value. 
Command line properties override any configuration values given.")
           .withRequiredArg
           .ofType(classOf[KeyValuePair])
           .describedAs("key=value")
 
-  var configLoaderFactory: ConfigLoaderFactory = _
+  var configFactory: ConfigFactory = null
+
+  def loadConfig(options: OptionSet) = {
+    // Verify legitimate parameters.
+    if (!options.has(configPathOpt)) {
+      parser.printHelpOn(System.err)
+      System.exit(-1)
+    }
 
-  def loadConfig(options: OptionSet): Config = {
     // Set up the job parameters.
-    val configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt)
-    val configLoaderProperties = 
options.valuesOf(configLoaderPropertiesOpt).asScala
-      .map(kv => (ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + 
kv.key, kv.value))
-      .toMap
-    val configOverrides = options.valuesOf(configOverrideOpt).asScala
-      .map(kv => (kv.key, kv.value))
-      .toMap
-    val original = mutable.HashMap[String, String]()
-    original += JobConfig.CONFIG_LOADER_FACTORY -> configLoaderFactoryClassName
-    original ++= configLoaderProperties
-    original ++= configOverrides
+    val configFactoryClassName = options.valueOf(configFactoryOpt)
+    val configPaths = options.valuesOf(configPathOpt)
+    configFactory = ReflectionUtil.getObj(configFactoryClassName, 
classOf[ConfigFactory])
+    val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv 
=> (kv.key, kv.value)).toMap
 
-    ConfigUtil.loadConfig(new MapConfig(original.asJava))
+    val configs: Buffer[java.util.Map[String, String]] = 
configPaths.asScala.map(configFactory.getConfig)
+    configs += configOverrides.asJava
+    new MapConfig(configs.asJava)
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 2b13409..e3e8eea 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -36,10 +36,10 @@ public class TestApplicationRunnerMain {
   public void TestRunOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
     });
@@ -51,10 +51,10 @@ public class TestApplicationRunnerMain {
   public void TestKillOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=kill"
@@ -67,10 +67,10 @@ public class TestApplicationRunnerMain {
   public void TestStatusOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "--config-factory",
+        "org.apache.samza.config.factories.PropertiesConfigFactory",
+        "--config-path",
+        getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=status"
diff --git 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 777a6a0..971e55f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,27 +19,35 @@
 
 package org.apache.samza.checkpoint
 
-import java.util.Properties
+import java.util
 
 import org.apache.samza.Partition
-import org.apache.samza.checkpoint.CheckpointTool.{CheckpointToolCommandLine, 
TaskNameToCheckpointMap}
-import 
org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, 
MockSystemFactory}
-import org.apache.samza.config._
+import org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine
+import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.container.TaskName
+import 
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
+import org.apache.samza.config._
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
CoordinatorStreamStoreTestUtil}
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.execution.JobPlanner
 import org.apache.samza.metrics.MetricsRegistry
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system._
-import org.junit.{Before, Test}
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Before
+import org.junit.Test
 import org.mockito.Matchers._
-import org.mockito.Mockito
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.execution.JobPlanner
+import org.mockito.Mockito
 
 object TestCheckpointTool {
   var checkpointManager: CheckpointManager = _
@@ -120,7 +128,7 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
 
   @Test
   def testGrouping(): Unit = {
-    val config : java.util.Properties = new Properties()
+    val config : java.util.Map[String, String] = new util.HashMap()
     config.put("tasknames.Partition 
0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", 
"0000")
     config.put("tasknames.Partition 
0.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.0",
 "1111")
     config.put("tasknames.Partition 
1.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.1", 
"2222")
@@ -128,7 +136,7 @@ class TestCheckpointTool extends AssertionsForJUnit with 
MockitoSugar {
     config.put("tasknames.Partition 
1.systems.kafka-atc-repartitioned-requests.streams.StateChange.partitions.1", 
"5555")
 
     val ccl = new CheckpointToolCommandLine
-    val result = ccl.parseOffsets(config)
+    val result = ccl.parseOffsets(new MapConfig(config))
 
     assert(result(new TaskName("Partition 0")).size == 2)
     assert(result(new TaskName("Partition 1")).size == 3)
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala 
b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index fd3c6ce..0853f8e 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -43,10 +43,10 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.processCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath))
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      getClass.getResource("/test.properties").getPath))
     assertEquals(1, TestJobRunner.processCount)
   }
 
@@ -56,10 +56,10 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.killCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath,
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      getClass.getResource("/test.properties").getPath,
       "--operation=kill"))
     assertEquals(1, TestJobRunner.killCount)
   }
@@ -70,10 +70,10 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.getStatusCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath,
+      "--config-factory",
+      "org.apache.samza.config.factories.PropertiesConfigFactory",
+      "--config-path",
+      getClass.getResource("/test.properties").getPath,
       "--operation=status"))
     assertEquals(1, TestJobRunner.getStatusCount)
   }
diff --git 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
index d5f0f8b..02f1616 100644
--- 
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
+++ 
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
@@ -20,9 +20,11 @@
 package org.apache.samza.storage.kv;
 
 import java.util.List;
+
 import joptsimple.ArgumentAcceptingOptionSpec;
 import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
+
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.util.CommandLine;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -71,8 +73,8 @@ public class RocksDbReadingTool extends CommandLine {
   private Logger log = LoggerFactory.getLogger(RocksDbReadingTool.class);
 
   @Override
-  public Config loadConfig(OptionSet options) {
-    Config config = super.loadConfig(options);
+  public MapConfig loadConfig(OptionSet options) {
+    MapConfig config = super.loadConfig(options);
     // get the db name
     if (options.has(dbNameArgument)) {
       dbName = options.valueOf(dbNameArgument);
@@ -136,7 +138,7 @@ public class RocksDbReadingTool extends CommandLine {
   public static void main(String[] args) throws RocksDBException {
     RocksDbReadingTool tool = new RocksDbReadingTool();
     OptionSet options = tool.parser().parse(args);
-    Config config = tool.loadConfig(options);
+    MapConfig config = tool.loadConfig(options);
     String path = tool.getDbPath();
     String dbName = tool.getDbName();
     RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, path, 
config);
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java 
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
index a2db84b..e35f628 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -133,7 +132,7 @@ public class SamzaRestService {
   private static SamzaRestConfig parseConfig(String[] args) {
     CommandLine cmd = new CommandLine();
     OptionSet options = cmd.parser().parse(args);
-    Config cfg = cmd.loadConfig(options);
+    MapConfig cfg = cmd.loadConfig(options);
     return new SamzaRestConfig(new MapConfig(cfg));
   }
 
diff --git a/samza-test/src/main/config/join/checker.samza 
b/samza-test/src/main/config/join/checker.samza
index d2fcb69..faef65e 100644
--- a/samza-test/src/main/config/join/checker.samza
+++ b/samza-test/src/main/config/join/checker.samza
@@ -16,33 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
 # App
 app.name=checker
 
diff --git a/samza-test/src/main/config/join/watcher.samza 
b/samza-test/src/main/config/join/common.properties
similarity index 90%
copy from samza-test/src/main/config/join/watcher.samza
copy to samza-test/src/main/config/join/common.properties
index 2ff06d5..7f88b38 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/common.properties
@@ -42,14 +42,3 @@ yarn.container.retry.window.ms=60000
 #Coordinator replication factor
 job.coordinator.system=kafka
 job.coordinator.replication.factor=1
-
-# App
-app.name=watcher
-
-# Task
-task.class=org.apache.samza.test.integration.join.Watcher
-task.inputs=kafka.epoch
-
-task.window.ms=40000
-
-max.time.between.epochs.ms=60000
diff --git a/samza-test/src/main/config/join/emitter.samza 
b/samza-test/src/main/config/join/emitter.samza
index 9b6954c..ab71849 100644
--- a/samza-test/src/main/config/join/emitter.samza
+++ b/samza-test/src/main/config/join/emitter.samza
@@ -16,33 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
 # App
 app.name=emitter
 
diff --git a/samza-test/src/main/config/join/joiner.samza 
b/samza-test/src/main/config/join/joiner.samza
index d61a3c9..3e15f07 100644
--- a/samza-test/src/main/config/join/joiner.samza
+++ b/samza-test/src/main/config/join/joiner.samza
@@ -16,33 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
 # App
 app.name=joiner
 
diff --git a/samza-test/src/main/config/join/watcher.samza 
b/samza-test/src/main/config/join/watcher.samza
index 2ff06d5..05e31d1 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/watcher.samza
@@ -16,33 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
 # App
 app.name=watcher
 
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
index f490132..8edfa27 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -42,7 +42,7 @@ public class LocalApplicationRunnerMain {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(LocalApplicationRunnerMain.class);
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new 
ApplicationRunnerMain.ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(args);
     Config orgConfig = cmdLine.loadConfig(options);
@@ -60,7 +60,7 @@ public class LocalApplicationRunnerMain {
     }
   }
 
-  private static Optional<ExternalContext> 
buildExternalContext(@SuppressWarnings("unused") Config config) {
+  private static Optional<ExternalContext> buildExternalContext(Config config) 
{
     /*
      * By default, use an empty ExternalContext here. In a custom fork of 
Samza, this can be implemented to pass
      * a non-empty ExternalContext. Only config should be used to build the 
external context. In the future, components

Reply via email to