This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch 1.4.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.4.0 by this push:
new b6be178 Revert "SAMZA-2420: Update CommandLine to use config loader
for local config file (#1256)" (#1270)
b6be178 is described below
commit b6be1788f0cf7208296c97c810f193a1200a49fd
Author: Cameron Lee <[email protected]>
AuthorDate: Fri Feb 7 16:07:39 2020 -0800
Revert "SAMZA-2420: Update CommandLine to use config loader for local
config file (#1256)" (#1270)
This reverts commit 1a03a6a9b8ed19129305842744d6619277bb72f6.
Reverting this change for the 1.4.0 branch since it is backwards
incompatible and we don't want to release this for 1.4.
---
bin/setup-int-test.sh | 2 +-
.../java/org/apache/samza/config/MapConfig.java | 1 -
.../stream/CoordinatorStreamWriter.java | 2 +-
.../org/apache/samza/storage/StateStorageTool.java | 9 ++--
.../org/apache/samza/storage/StorageRecovery.java | 26 +++++-----
.../apache/samza/checkpoint/CheckpointTool.scala | 21 +++-----
.../CoordinatorStreamWriterCommandLine.scala | 12 ++---
.../scala/org/apache/samza/util/CommandLine.scala | 57 +++++++++++-----------
.../samza/runtime/TestApplicationRunnerMain.java | 24 ++++-----
.../samza/checkpoint/TestCheckpointTool.scala | 30 +++++++-----
.../scala/org/apache/samza/job/TestJobRunner.scala | 24 ++++-----
.../samza/storage/kv/RocksDbReadingTool.java | 10 ++--
.../org/apache/samza/rest/SamzaRestService.java | 3 +-
samza-test/src/main/config/join/checker.samza | 27 ----------
.../join/{watcher.samza => common.properties} | 11 -----
samza-test/src/main/config/join/emitter.samza | 27 ----------
samza-test/src/main/config/join/joiner.samza | 27 ----------
samza-test/src/main/config/join/watcher.samza | 27 ----------
.../integration/LocalApplicationRunnerMain.java | 4 +-
19 files changed, 115 insertions(+), 229 deletions(-)
diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
index b33ff27..112bda6 100755
--- a/bin/setup-int-test.sh
+++ b/bin/setup-int-test.sh
@@ -43,7 +43,7 @@ $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181
--partitions 1 --repli
# Start the jobs
for job in checker joiner emitter watcher
do
- $SAMZA_DIR/bin/run-job.sh
--config-loader-factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
--config-loader-properties path=$SAMZA_DIR/config/join/$job.samza --config
job.foo=$job
+ $SAMZA_DIR/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$SAMZA_DIR/config/join/common.properties
--config-path=file://$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
done
diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
index 213d453..5af2535 100644
--- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
+++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java
@@ -50,7 +50,6 @@ public class MapConfig extends Config {
}
}
- @SafeVarargs
public MapConfig(Map<String, String>... maps) {
this(Arrays.asList(maps));
}
diff --git
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index 903f99a..2e857f4 100644
---
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -103,7 +103,7 @@ public class CoordinatorStreamWriter {
* Main function for using the CoordinatorStreamWriter. The main function
starts a CoordinatorStreamWriter
* and sends control messages.
* To run the code use the following command:
- * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh
--config-loader-factory={config--loader-factory}
--config-loader-properties={properties needed for config loader to load config}
--type={type of the message} --key={[optional] key of the message}
--value={[optional] value of the message}
+ * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh
--config-factory={config-factory} --config-path={path to config file of a job}
--type={type of the message} --key={[optional] key of the message}
--value={[optional] value of the message}
*
* @param args input arguments for running the writer. These arguments are:
* "config-factory" = The config file factory
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
index 6518929..beba35c 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
@@ -21,7 +21,8 @@ package org.apache.samza.storage;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
+
+import org.apache.samza.config.MapConfig;
import org.apache.samza.util.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,8 +36,8 @@ public class StateStorageTool extends CommandLine {
private Logger log = LoggerFactory.getLogger(StateStorageTool.class);
@Override
- public Config loadConfig(OptionSet options) {
- Config config = super.loadConfig(options);
+ public MapConfig loadConfig(OptionSet options) {
+ MapConfig config = super.loadConfig(options);
if (options.has(newPathArgu)) {
newPath = options.valueOf(newPathArgu);
log.info("new state storage is " + newPath);
@@ -51,7 +52,7 @@ public class StateStorageTool extends CommandLine {
public static void main(String[] args) {
StateStorageTool tool = new StateStorageTool();
OptionSet options = tool.parser().parse(args);
- Config config = tool.loadConfig(options);
+ MapConfig config = tool.loadConfig(options);
String path = tool.getPath();
StorageRecovery storageRecovery = new StorageRecovery(config, path);
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index 5d34176..11237a8 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -53,6 +53,7 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
+import org.apache.samza.util.CommandLine;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.StreamUtil;
@@ -62,11 +63,11 @@ import org.slf4j.LoggerFactory;
/**
- * Recovers the state storage from the changelog streams and stores the state
+ * Recovers the state storages from the changelog streams and store the
storages
* in the directory provided by the users. The changelog streams are derived
* from the job's config file.
*/
-public class StorageRecovery {
+public class StorageRecovery extends CommandLine {
private static final Logger LOG =
LoggerFactory.getLogger(StorageRecovery.class);
private final Config jobConfig;
@@ -107,7 +108,7 @@ public class StorageRecovery {
}
/**
- * run the setup phase and restore all the task storage
+ * run the setup phase and restore all the task storages
*/
public void run() {
setup();
@@ -125,7 +126,9 @@ public class StorageRecovery {
+ " Proceeding with the next container", containerName);
}
});
- this.containerStorageManagers.forEach((containerName,
containerStorageManager) -> containerStorageManager.shutdown());
+ this.containerStorageManagers.forEach((containerName,
containerStorageManager) -> {
+ containerStorageManager.shutdown();
+ });
systemAdmins.stop();
LOG.info("successfully recovered in " + storeBaseDir.toString());
@@ -166,15 +169,13 @@ public class StorageRecovery {
LOG.info("stream name for " + storeName + " is " +
streamName.orElse(null));
- streamName.ifPresent(name -> changeLogSystemStreams.put(storeName,
StreamUtil.getSystemStreamFromNames(name)));
+ if (streamName.isPresent()) {
+ changeLogSystemStreams.put(storeName,
StreamUtil.getSystemStreamFromNames(streamName.get()));
+ }
Optional<String> factoryClass =
config.getStorageFactoryClassName(storeName);
if (factoryClass.isPresent()) {
- @SuppressWarnings("unchecked")
- StorageEngineFactory<Object, Object> factory =
- (StorageEngineFactory<Object, Object>)
ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class);
-
- storageEngineFactories.put(storeName, factory);
+ storageEngineFactories.put(storeName,
ReflectionUtil.getObj(factoryClass.get(), StorageEngineFactory.class));
} else {
throw new SamzaException("Missing storage factory for " + storeName +
".");
}
@@ -203,8 +204,7 @@ public class StorageRecovery {
.forEach(serdeName -> {
String serdeClassName =
serializerConfig.getSerdeFactoryClass(serdeName)
.orElseGet(() ->
SerializerConfig.getPredefinedSerdeFactoryName(serdeName));
- @SuppressWarnings("unchecked")
- Serde<Object> serde =
+ Serde serde =
ReflectionUtil.getObj(serdeClassName,
SerdeFactory.class).getSerde(serdeName, serializerConfig);
serdeMap.put(serdeName, serde);
});
@@ -216,7 +216,7 @@ public class StorageRecovery {
* create one TaskStorageManager for each task. Add all of them to the
* List<TaskStorageManager>
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({"unchecked", "rawtypes"})
private void getContainerStorageManagers() {
Clock clock = SystemClock.instance();
StreamMetadataCache streamMetadataCache = new
StreamMetadataCache(systemAdmins, 5000, clock);
diff --git
a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 4db807e..eff1e73 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -19,10 +19,9 @@
package org.apache.samza.checkpoint
-import java.io.FileInputStream
import java.net.URI
import java.util
-import java.util.Properties
+import java.util.function.Supplier
import java.util.regex.Pattern
import joptsimple.ArgumentAcceptingOptionSpec
@@ -33,7 +32,7 @@ import org.apache.samza.container.TaskName
import org.apache.samza.job.JobRunner.info
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil,
Logging}
+import org.apache.samza.util.{CommandLine, ConfigUtil, CoordinatorStreamUtil,
Logging, ReflectionUtil, Util}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
@@ -92,7 +91,7 @@ object CheckpointTool {
var newOffsets: TaskNameToCheckpointMap = _
- def parseOffsets(propertiesFile: Properties): TaskNameToCheckpointMap = {
+ def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
var checkpoints : ListBuffer[(TaskName, Map[SystemStreamPartition,
String])] = ListBuffer()
propertiesFile.asScala.foreach { case (key, value) =>
val matcher = SSP_REGEX.matcher(key)
@@ -118,15 +117,10 @@ object CheckpointTool {
.mapValues(m => m.reduce( _ ++ _)) // Merge all the maps of
SSPs->Offset into one for the whole taskname
}
- override def loadConfig(options: OptionSet): Config = {
+ override def loadConfig(options: OptionSet): MapConfig = {
val config = super.loadConfig(options)
if (options.has(newOffsetsOpt)) {
- val newOffsetsInputStream = new
FileInputStream(options.valueOf(newOffsetsOpt).getPath)
- val properties = new Properties()
-
- properties.load(newOffsetsInputStream)
- newOffsetsInputStream.close()
-
+ val properties =
configFactory.getConfig(options.valueOf(newOffsetsOpt))
newOffsets = parseOffsets(properties)
}
config
@@ -198,11 +192,12 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap,
coordinatorStreamStore
if (newOffsets != null) {
newOffsets.foreach {
- case (taskName: TaskName, offsets: Map[SystemStreamPartition,
String]) =>
+ case (taskName: TaskName, offsets: Map[SystemStreamPartition,
String]) => {
logCheckpoint(taskName, offsets, "New offset to be written for
task: " + taskName)
val checkpoint = new Checkpoint(offsets.asJava)
checkpointManager.writeCheckpoint(taskName, checkpoint)
info(s"Updated the checkpoint of the task: $taskName to: $offsets")
+ }
}
}
} finally {
@@ -212,7 +207,7 @@ class CheckpointTool(newOffsets: TaskNameToCheckpointMap,
coordinatorStreamStore
}
def getConfigFromCoordinatorStream(coordinatorStreamStore:
CoordinatorStreamStore): Config = {
-
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+ return
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
}
def logCheckpoint(tn: TaskName, checkpoint: Map[SystemStreamPartition,
String], prefix: String) {
diff --git
a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
index dbeeffa..0c17800 100644
---
a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
+++
b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamWriterCommandLine.scala
@@ -20,11 +20,11 @@
package org.apache.samza.coordinator.stream
import org.apache.samza.util.CommandLine
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
+import joptsimple.OptionSet
class CoordinatorStreamWriterCommandLine extends CommandLine {
- val messageType: ArgumentAcceptingOptionSpec[String] =
+ val messageType =
parser.accepts("type", "the type of the message being sent.")
.withRequiredArg
.ofType(classOf[java.lang.String])
@@ -32,19 +32,19 @@ class CoordinatorStreamWriterCommandLine extends
CommandLine {
" The possible values are {\"set-config\"}")
- val messageKey: ArgumentAcceptingOptionSpec[String] =
+ val messageKey =
parser.accepts("key", "the type of the message being sent")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("key of the message")
- val messageValue: ArgumentAcceptingOptionSpec[String] =
+ val messageValue =
parser.accepts("value", "the type of the message being sent")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("value of the message")
- def loadType(options: OptionSet): String = {
+ def loadType(options: OptionSet) = {
if (!options.has(messageType)) {
parser.printHelpOn(System.err)
System.exit(-1)
@@ -60,7 +60,7 @@ class CoordinatorStreamWriterCommandLine extends CommandLine {
}
}
- def loadValue(options: OptionSet): String = {
+ def loadValue(options: OptionSet) = {
var value: java.lang.String = null
if (options.has(messageValue)) {
value = options.valueOf(messageValue)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index 5ab5e99..b97afad 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -19,13 +19,13 @@
package org.apache.samza.util
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
+import java.net.URI
+import joptsimple.{OptionParser, OptionSet}
import joptsimple.util.KeyValuePair
-import org.apache.samza.config.{Config, ConfigLoaderFactory, JobConfig,
MapConfig}
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
-
+import org.apache.samza.config.{ConfigFactory, MapConfig}
+import org.apache.samza.config.factories.PropertiesConfigFactory
+import scala.collection.mutable.Buffer
import scala.collection.JavaConverters._
-import scala.collection.mutable
/**
* Defines a basic set of command-line options for Samza tasks. Tools can use
this
@@ -33,40 +33,41 @@ import scala.collection.mutable
*/
class CommandLine {
val parser = new OptionParser()
- val configLoaderFactoryOpt: ArgumentAcceptingOptionSpec[String] =
- parser.accepts("config-loader-factory", "The config loader factory to use
to read full job config file.")
+ val configFactoryOpt =
+ parser.accepts("config-factory", "The config factory to use to read your
config file.")
.withRequiredArg
.ofType(classOf[java.lang.String])
.describedAs("com.foo.bar.ClassName")
- .defaultsTo(classOf[PropertiesConfigLoaderFactory].getName)
- val configLoaderPropertiesOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
- parser.accepts("config-loader-properties", "A config loader property in
the form key=value. Config loader properties will be passed to " +
- "designated config loader
factory to load full job config.")
+ .defaultsTo(classOf[PropertiesConfigFactory].getName)
+ val configPathOpt =
+ parser.accepts("config-path", "URI location to a config file (e.g.
file:///some/local/path.properties). " +
+ "If multiple files are given they are all
used with later files overriding any values that appear in earlier files.")
.withRequiredArg
- .ofType(classOf[KeyValuePair])
- .describedAs("key=value")
- val configOverrideOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
+ .ofType(classOf[URI])
+ .describedAs("path")
+ val configOverrideOpt =
parser.accepts("config", "A configuration value in the form key=value.
Command line properties override any configuration values given.")
.withRequiredArg
.ofType(classOf[KeyValuePair])
.describedAs("key=value")
- var configLoaderFactory: ConfigLoaderFactory = _
+ var configFactory: ConfigFactory = null
+
+ def loadConfig(options: OptionSet) = {
+ // Verify legitimate parameters.
+ if (!options.has(configPathOpt)) {
+ parser.printHelpOn(System.err)
+ System.exit(-1)
+ }
- def loadConfig(options: OptionSet): Config = {
// Set up the job parameters.
- val configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt)
- val configLoaderProperties =
options.valuesOf(configLoaderPropertiesOpt).asScala
- .map(kv => (ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX +
kv.key, kv.value))
- .toMap
- val configOverrides = options.valuesOf(configOverrideOpt).asScala
- .map(kv => (kv.key, kv.value))
- .toMap
- val original = mutable.HashMap[String, String]()
- original += JobConfig.CONFIG_LOADER_FACTORY -> configLoaderFactoryClassName
- original ++= configLoaderProperties
- original ++= configOverrides
+ val configFactoryClassName = options.valueOf(configFactoryOpt)
+ val configPaths = options.valuesOf(configPathOpt)
+ configFactory = ReflectionUtil.getObj(configFactoryClassName,
classOf[ConfigFactory])
+ val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv
=> (kv.key, kv.value)).toMap
- ConfigUtil.loadConfig(new MapConfig(original.asJava))
+ val configs: Buffer[java.util.Map[String, String]] =
configPaths.asScala.map(configFactory.getConfig)
+ configs += configOverrides.asJava
+ new MapConfig(configs.asJava)
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 2b13409..e3e8eea 100644
---
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -36,10 +36,10 @@ public class TestApplicationRunnerMain {
public void TestRunOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS,
MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s",
TestApplicationRunnerInvocationCounts.class.getName()),
});
@@ -51,10 +51,10 @@ public class TestApplicationRunnerMain {
public void TestKillOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS,
MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s",
TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=kill"
@@ -67,10 +67,10 @@ public class TestApplicationRunnerMain {
public void TestStatusOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS,
MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s",
TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=status"
diff --git
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index 777a6a0..971e55f 100644
---
a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++
b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -19,27 +19,35 @@
package org.apache.samza.checkpoint
-import java.util.Properties
+import java.util
import org.apache.samza.Partition
-import org.apache.samza.checkpoint.CheckpointTool.{CheckpointToolCommandLine,
TaskNameToCheckpointMap}
-import
org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory,
MockSystemFactory}
-import org.apache.samza.config._
+import org.apache.samza.checkpoint.CheckpointTool.CheckpointToolCommandLine
+import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.container.TaskName
+import
org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.checkpoint.TestCheckpointTool.MockSystemFactory
+import org.apache.samza.config._
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
CoordinatorStreamStoreTestUtil}
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
-import org.apache.samza.execution.JobPlanner
import org.apache.samza.metrics.MetricsRegistry
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system._
-import org.junit.{Before, Test}
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.system.SystemStreamPartition
+import org.junit.Before
+import org.junit.Test
import org.mockito.Matchers._
-import org.mockito.Mockito
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.execution.JobPlanner
+import org.mockito.Mockito
object TestCheckpointTool {
var checkpointManager: CheckpointManager = _
@@ -120,7 +128,7 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
@Test
def testGrouping(): Unit = {
- val config : java.util.Properties = new Properties()
+ val config : java.util.Map[String, String] = new util.HashMap()
config.put("tasknames.Partition
0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0",
"0000")
config.put("tasknames.Partition
0.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.0",
"1111")
config.put("tasknames.Partition
1.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.1",
"2222")
@@ -128,7 +136,7 @@ class TestCheckpointTool extends AssertionsForJUnit with
MockitoSugar {
config.put("tasknames.Partition
1.systems.kafka-atc-repartitioned-requests.streams.StateChange.partitions.1",
"5555")
val ccl = new CheckpointToolCommandLine
- val result = ccl.parseOffsets(config)
+ val result = ccl.parseOffsets(new MapConfig(config))
assert(result(new TaskName("Partition 0")).size == 2)
assert(result(new TaskName("Partition 1")).size == 3)
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index fd3c6ce..0853f8e 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -43,10 +43,10 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.processCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath))
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass.getResource("/test.properties").getPath))
assertEquals(1, TestJobRunner.processCount)
}
@@ -56,10 +56,10 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.killCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath,
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass.getResource("/test.properties").getPath,
"--operation=kill"))
assertEquals(1, TestJobRunner.killCount)
}
@@ -70,10 +70,10 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.getStatusCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath,
+ "--config-factory",
+ "org.apache.samza.config.factories.PropertiesConfigFactory",
+ "--config-path",
+ getClass.getResource("/test.properties").getPath,
"--operation=status"))
assertEquals(1, TestJobRunner.getStatusCount)
}
diff --git
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
index d5f0f8b..02f1616 100644
---
a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
+++
b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
@@ -20,9 +20,11 @@
package org.apache.samza.storage.kv;
import java.util.List;
+
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
+
+import org.apache.samza.config.MapConfig;
import org.apache.samza.util.CommandLine;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -71,8 +73,8 @@ public class RocksDbReadingTool extends CommandLine {
private Logger log = LoggerFactory.getLogger(RocksDbReadingTool.class);
@Override
- public Config loadConfig(OptionSet options) {
- Config config = super.loadConfig(options);
+ public MapConfig loadConfig(OptionSet options) {
+ MapConfig config = super.loadConfig(options);
// get the db name
if (options.has(dbNameArgument)) {
dbName = options.valueOf(dbNameArgument);
@@ -136,7 +138,7 @@ public class RocksDbReadingTool extends CommandLine {
public static void main(String[] args) throws RocksDBException {
RocksDbReadingTool tool = new RocksDbReadingTool();
OptionSet options = tool.parser().parse(args);
- Config config = tool.loadConfig(options);
+ MapConfig config = tool.loadConfig(options);
String path = tool.getDbPath();
String dbName = tool.getDbName();
RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, path,
config);
diff --git
a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
index a2db84b..e35f628 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/SamzaRestService.java
@@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import joptsimple.OptionSet;
-import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -133,7 +132,7 @@ public class SamzaRestService {
private static SamzaRestConfig parseConfig(String[] args) {
CommandLine cmd = new CommandLine();
OptionSet options = cmd.parser().parse(args);
- Config cfg = cmd.loadConfig(options);
+ MapConfig cfg = cmd.loadConfig(options);
return new SamzaRestConfig(new MapConfig(cfg));
}
diff --git a/samza-test/src/main/config/join/checker.samza
b/samza-test/src/main/config/join/checker.samza
index d2fcb69..faef65e 100644
--- a/samza-test/src/main/config/join/checker.samza
+++ b/samza-test/src/main/config/join/checker.samza
@@ -16,33 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
# App
app.name=checker
diff --git a/samza-test/src/main/config/join/watcher.samza
b/samza-test/src/main/config/join/common.properties
similarity index 90%
copy from samza-test/src/main/config/join/watcher.samza
copy to samza-test/src/main/config/join/common.properties
index 2ff06d5..7f88b38 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/common.properties
@@ -42,14 +42,3 @@ yarn.container.retry.window.ms=60000
#Coordinator replication factor
job.coordinator.system=kafka
job.coordinator.replication.factor=1
-
-# App
-app.name=watcher
-
-# Task
-task.class=org.apache.samza.test.integration.join.Watcher
-task.inputs=kafka.epoch
-
-task.window.ms=40000
-
-max.time.between.epochs.ms=60000
diff --git a/samza-test/src/main/config/join/emitter.samza
b/samza-test/src/main/config/join/emitter.samza
index 9b6954c..ab71849 100644
--- a/samza-test/src/main/config/join/emitter.samza
+++ b/samza-test/src/main/config/join/emitter.samza
@@ -16,33 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
# App
app.name=emitter
diff --git a/samza-test/src/main/config/join/joiner.samza
b/samza-test/src/main/config/join/joiner.samza
index d61a3c9..3e15f07 100644
--- a/samza-test/src/main/config/join/joiner.samza
+++ b/samza-test/src/main/config/join/joiner.samza
@@ -16,33 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
# App
app.name=joiner
diff --git a/samza-test/src/main/config/join/watcher.samza
b/samza-test/src/main/config/join/watcher.samza
index 2ff06d5..05e31d1 100644
--- a/samza-test/src/main/config/join/watcher.samza
+++ b/samza-test/src/main/config/join/watcher.samza
@@ -16,33 +16,6 @@
# specific language governing permissions and limitations
# under the License.
-####################
-### UPDATE THIS! ###
-####################
-yarn.package.path=<YARN.PACKAGE.PATH>
-
-job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
-
- # Kafka System
-systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.samza.key.serde=string
-systems.kafka.samza.msg.serde=string
-
-yarn.container.retry.count=-1
-yarn.container.retry.window.ms=60000
-
-#Coordinator replication factor
-job.coordinator.system=kafka
-job.coordinator.replication.factor=1
-
# App
app.name=watcher
diff --git
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
index f490132..8edfa27 100644
---
a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
+++
b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
@@ -42,7 +42,7 @@ public class LocalApplicationRunnerMain {
private static final Logger LOGGER =
LoggerFactory.getLogger(LocalApplicationRunnerMain.class);
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new
ApplicationRunnerMain.ApplicationRunnerCommandLine();
OptionSet options = cmdLine.parser().parse(args);
Config orgConfig = cmdLine.loadConfig(options);
@@ -60,7 +60,7 @@ public class LocalApplicationRunnerMain {
}
}
- private static Optional<ExternalContext>
buildExternalContext(@SuppressWarnings("unused") Config config) {
+ private static Optional<ExternalContext> buildExternalContext(Config config)
{
/*
* By default, use an empty ExternalContext here. In a custom fork of
Samza, this can be implemented to pass
* a non-empty ExternalContext. Only config should be used to build the
external context. In the future, components