SAMZA-788 - coordinator stream configuration should not guess the system names
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/092e3811 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/092e3811 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/092e3811 Branch: refs/heads/samza-sql Commit: 092e381131cb7de9e9ffc0807e997418daa3498a Parents: 62254d0 Author: Navina <[email protected]> Authored: Thu Nov 19 14:21:16 2015 -0800 Committer: Navina <[email protected]> Committed: Thu Nov 19 14:21:16 2015 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/config/JobConfig.scala | 13 ++----------- .../stream/TestCoordinatorStreamWriter.java | 1 + .../src/test/resources/test-migration-fail.properties | 1 + samza-core/src/test/resources/test.properties | 3 ++- 4 files changed, 6 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 6d73bb9..85a1ca4 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -50,17 +50,8 @@ object JobConfig { class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getName = getOption(JobConfig.JOB_NAME) - def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({ - // If no coordinator system is configured, try and guess it if there's just one system configured. - val systemNames = config.getSystemNames.toSet - if (systemNames.size == 1) { - val systemName = systemNames.iterator.next - info("No coordinator system defined, so defaulting to %s" format systemName) - systemName - } else { - throw new ConfigException("Missing job.coordinator.system configuration.") - } - }) + def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse( + throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")) def getContainerCount = { getOption(JobConfig.JOB_CONTAINER_COUNT) match { http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java index f9c6304..f83487d 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java @@ -51,6 +51,7 @@ public class TestCoordinatorStreamWriter { Map<String, String> configMap = new HashMap<>(); configMap.put("systems.coordinatorStreamWriter.samza.factory", "org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory"); configMap.put("job.name", "coordinator-stream-writer-test"); + configMap.put("job.coordinator.system", "coordinatorStreamWriter"); Config config = new MapConfig(configMap); coordinatorStreamWriter = new CoordinatorStreamWriter(config); boolean exceptionHappened = false; http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test-migration-fail.properties ---------------------------------------------------------------------- diff --git a/samza-core/src/test/resources/test-migration-fail.properties b/samza-core/src/test/resources/test-migration-fail.properties index b0657de..3b6c1f6 100644 --- a/samza-core/src/test/resources/test-migration-fail.properties +++ b/samza-core/src/test/resources/test-migration-fail.properties @@ -24,3 +24,4 @@ job.name=test-job foo=bar systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory task.checkpoint.factory=org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory +job.coordinator.system=coordinator http://git-wip-us.apache.org/repos/asf/samza/blob/092e3811/samza-core/src/test/resources/test.properties ---------------------------------------------------------------------- diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties index 41eb82e..be16c86 100644 --- a/samza-core/src/test/resources/test.properties +++ b/samza-core/src/test/resources/test.properties @@ -22,4 +22,5 @@ job.factory.class=org.apache.samza.job.MockJobFactory job.name=test-job foo=bar -systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory \ No newline at end of file +systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory +job.coordinator.system=coordinator \ No newline at end of file
