Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 a9b213c13 -> 8c1f56d6d


Make execution environment use api's configs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c1f56d6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c1f56d6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c1f56d6

Branch: refs/heads/samza-fluent-api-v1
Commit: 8c1f56d6d0b596ed455d215b00439da9098049c4
Parents: a9b213c
Author: Xinyu Liu <[email protected]>
Authored: Mon Feb 27 13:07:23 2017 -0800
Committer: Xinyu Liu <[email protected]>
Committed: Mon Feb 27 13:07:23 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/JobConfig.scala   |  4 ----
 .../src/main/scala/org/apache/samza/job/JobRunner.scala  | 11 +++--------
 2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6b1473c..1c58293 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -196,8 +196,4 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     case Some(mode) => mode.toBoolean
     case _ => false
   }
-
-  def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "")
-
-  def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "")
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a34cedb..61bfafb 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -70,16 +70,11 @@ object JobRunner extends Logging {
     val config = cmdline.loadConfig(options)
 
     // start execution env if it's defined
-    val envClass: String = config.getExecutionEnv
+    val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, 
"")
     if (!envClass.isEmpty) {
       val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
-      val streamGraphBuilderClass: String = config.getStreamGraphBuilder
-      if (!streamGraphBuilderClass.isEmpty) {
-        val streamGraphBuilder: StreamGraphBuilder = 
ClassLoaderHelper.fromClassName(streamGraphBuilderClass)
-        env.run(streamGraphBuilder, config)
-      } else {
-        throw new SamzaException("No stream graph builder defined")
-      }
+      val graphBuilder: StreamGraphBuilder = 
Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
+      env.run(graphBuilder, config)
     } else {
       new JobRunner(rewriteConfig(config)).run()
     }

Reply via email to