Repository: samza Updated Branches: refs/heads/master ad23e69b7 -> 0169912c6
SAMZA-833 - ProcessJob mishandling containers Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0169912c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0169912c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0169912c Branch: refs/heads/master Commit: 0169912c65249ea18e1da856838866ee622006a9 Parents: ad23e69 Author: Tao Feng <[email protected]> Authored: Thu Apr 14 23:12:18 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Thu Apr 14 23:12:18 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/job/local/ProcessJobFactory.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0169912c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index 17c2e5b..81ef59a 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -22,6 +22,7 @@ package org.apache.samza.job.local import java.io.File +import org.apache.samza.SamzaException import org.apache.samza.config.{JobConfig, Config} import org.apache.samza.config.TaskConfig._ import org.apache.samza.coordinator.JobCoordinator @@ -32,7 +33,13 @@ import org.apache.samza.util.{Logging, Util} * Creates a stand alone ProcessJob with the specified config. */ class ProcessJobFactory extends StreamJobFactory with Logging { - def getJob(config: Config): StreamJob = { + def getJob(config: Config): StreamJob = { + val containerCount = JobConfig.Config2Job(config).getContainerCount + + if (containerCount > 1) { + throw new SamzaException("Container count larger than 1 is not supported for ProcessJobFactory") + } + val coordinator = JobCoordinator(config) val containerModel = coordinator.jobModel.getContainers.get(0)
