SAMZA-788 - coordinator stream configuration should not guess the system names


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/092e3811
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/092e3811
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/092e3811

Branch: refs/heads/samza-sql
Commit: 092e381131cb7de9e9ffc0807e997418daa3498a
Parents: 62254d0
Author: Navina <[email protected]>
Authored: Thu Nov 19 14:21:16 2015 -0800
Committer: Navina <[email protected]>
Committed: Thu Nov 19 14:21:16 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/JobConfig.scala | 13 ++-----------
 .../stream/TestCoordinatorStreamWriter.java            |  1 +
 .../src/test/resources/test-migration-fail.properties  |  1 +
 samza-core/src/test/resources/test.properties          |  3 ++-
 4 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6d73bb9..85a1ca4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -50,17 +50,8 @@ object JobConfig {
 class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getName = getOption(JobConfig.JOB_NAME)
 
-  def getCoordinatorSystemName = 
getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
-    // If no coordinator system is configured, try and guess it if there's 
just one system configured.
-    val systemNames = config.getSystemNames.toSet
-    if (systemNames.size == 1) {
-      val systemName = systemNames.iterator.next
-      info("No coordinator system defined, so defaulting to %s" format 
systemName)
-      systemName
-    } else {
-      throw new ConfigException("Missing job.coordinator.system 
configuration.")
-    }
-  })
+  def getCoordinatorSystemName = 
getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
+      throw new ConfigException("Missing job.coordinator.system configuration. 
Cannot proceed with job execution."))
 
   def getContainerCount = {
     getOption(JobConfig.JOB_CONTAINER_COUNT) match {

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
index f9c6304..f83487d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java
@@ -51,6 +51,7 @@ public class TestCoordinatorStreamWriter {
     Map<String, String> configMap = new HashMap<>();
     configMap.put("systems.coordinatorStreamWriter.samza.factory", 
"org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory");
     configMap.put("job.name", "coordinator-stream-writer-test");
+    configMap.put("job.coordinator.system", "coordinatorStreamWriter");
     Config config = new MapConfig(configMap);
     coordinatorStreamWriter = new CoordinatorStreamWriter(config);
     boolean exceptionHappened = false;

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test-migration-fail.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test-migration-fail.properties 
b/samza-core/src/test/resources/test-migration-fail.properties
index b0657de..3b6c1f6 100644
--- a/samza-core/src/test/resources/test-migration-fail.properties
+++ b/samza-core/src/test/resources/test-migration-fail.properties
@@ -24,3 +24,4 @@ job.name=test-job
 foo=bar
 
systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory
+job.coordinator.system=coordinator

http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties 
b/samza-core/src/test/resources/test.properties
index 41eb82e..be16c86 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,4 +22,5 @@
 job.factory.class=org.apache.samza.job.MockJobFactory
 job.name=test-job
 foo=bar
-systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+job.coordinator.system=coordinator
\ No newline at end of file

Reply via email to