Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 a9b213c13 -> 8c1f56d6d
Make execution environment use api's configs Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c1f56d6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c1f56d6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c1f56d6 Branch: refs/heads/samza-fluent-api-v1 Commit: 8c1f56d6d0b596ed455d215b00439da9098049c4 Parents: a9b213c Author: Xinyu Liu <[email protected]> Authored: Mon Feb 27 13:07:23 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Mon Feb 27 13:07:23 2017 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/config/JobConfig.scala | 4 ---- .../src/main/scala/org/apache/samza/job/JobRunner.scala | 11 +++-------- 2 files changed, 3 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 6b1473c..1c58293 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -196,8 +196,4 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { case Some(mode) => mode.toBoolean case _ => false } - - def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "") - - def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "") } http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index a34cedb..61bfafb 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -70,16 +70,11 @@ object JobRunner extends Logging { val config = cmdline.loadConfig(options) // start execution env if it's defined - val envClass: String = config.getExecutionEnv + val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "") if (!envClass.isEmpty) { val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass) - val streamGraphBuilderClass: String = config.getStreamGraphBuilder - if (!streamGraphBuilderClass.isEmpty) { - val streamGraphBuilder: StreamGraphBuilder = ClassLoaderHelper.fromClassName(streamGraphBuilderClass) - env.run(streamGraphBuilder, config) - } else { - throw new SamzaException("No stream graph builder defined") - } + val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder] + env.run(graphBuilder, config) } else { new JobRunner(rewriteConfig(config)).run() }
