This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 93c1dd5  SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to 
load full job config (#1278)
93c1dd5 is described below

commit 93c1dd5d460113df66800e61642b7f34da2bae59
Author: Ke Wu <[email protected]>
AuthorDate: Tue Feb 18 14:06:00 2020 -0800

    SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job 
config (#1278)
    
    Design:
    
https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
    
    Changes:
    1. Update ProcessJobFactory to load full job config, execute planning and 
write full job config back to coordiantor stream, which was done by 
RemoteApplicationRunner
    2. Update ThreadJobFactory to load full job config, execute planning and 
write full job config back to coordiantor stream, which was done by 
RemoteApplicationRunner
---
 .../apache/samza/job/local/ProcessJobFactory.scala | 47 +++++++++++++++-------
 .../apache/samza/job/local/ThreadJobFactory.scala  | 38 ++++++++++++-----
 2 files changed, 61 insertions(+), 24 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 36f1457..ca82892 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, 
StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, 
DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new 
RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
 originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single process job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where 
JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
+    }
+
     val containerCount = new JobConfig(config).getContainerCount
 
     if (containerCount > 1) {
@@ -51,15 +74,11 @@ class ProcessJobFactory extends StreamJobFactory with 
Logging {
     val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
-
-    val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
-      coordinatorStreamStore, metricsRegistry)
-    val jobModel = coordinator.jobModel
-
+    val jobModelManager = JobModelManager(config, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
+    val jobModel = jobModelManager.jobModel
     val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
+
     for (containerModel <- jobModel.getContainers.values) {
       for (taskModel <- containerModel.getTasks.values) {
         taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
@@ -86,14 +105,14 @@ class ProcessJobFactory extends StreamJobFactory with 
Logging {
     info("Using command builder class %s" format commandBuilderClass)
     val commandBuilder = ReflectionUtil.getObj(commandBuilderClass, 
classOf[CommandBuilder])
 
-    // JobCoordinator is stopped by ProcessJob when it exits
-    coordinator.start
+    // Start JobModelManager which will be stopped by ProcessJob when it exits
+    jobModelManager.start
 
     commandBuilder
       .setConfig(config)
       .setId("0")
-      .setUrl(coordinator.server.getUrl)
+      .setUrl(jobModelManager.server.getUrl)
 
-    new ProcessJob(commandBuilder, coordinator)
+    new ProcessJob(commandBuilder, jobModelManager)
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 068499c..deea95a 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,16 +19,18 @@
 
 package org.apache.samza.job.local
 
+import org.apache.samza.SamzaException
 import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig, TaskConfig}
+import org.apache.samza.config.{Config, JobConfig}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.context.{ExternalContext, JobContextImpl}
-import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, 
NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
@@ -36,7 +38,7 @@ import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, 
DiagnosticsUtil, Logging}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -45,20 +47,36 @@ import scala.collection.mutable
   * Creates a new Thread job with the given config
   */
 class ThreadJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
+    var config = submissionConfig
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new 
RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
 originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single stage job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where 
JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
+    }
 
     val metricsRegistry = new MetricsRegistryMap()
     val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
 
-    val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
+    val jobModelManager = JobModelManager(config, 
changelogStreamManager.readPartitionMapping(),
       coordinatorStreamStore, metricsRegistry)
-    val jobModel = coordinator.jobModel
+    val jobModel = jobModelManager.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = 
mutable.Map[TaskName, Integer]()
     for (containerModel <- jobModel.getContainers.values) {
@@ -122,7 +140,7 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
     }
 
     try {
-      coordinator.start
+      jobModelManager.start
       val container = SamzaContainer(
         containerId,
         jobModel,
@@ -138,7 +156,7 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
       val threadJob = new ThreadJob(container)
       threadJob
     } finally {
-      coordinator.stop
+      jobModelManager.stop
       if (jmxServer != null) {
         jmxServer.stop
       }

Reply via email to