This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 93c1dd5 SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to
load full job config (#1278)
93c1dd5 is described below
commit 93c1dd5d460113df66800e61642b7f34da2bae59
Author: Ke Wu <[email protected]>
AuthorDate: Tue Feb 18 14:06:00 2020 -0800
SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job
config (#1278)
Design:
https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
Changes:
1. Update ProcessJobFactory to load full job config, execute planning and
write full job config back to coordiantor stream, which was done by
RemoteApplicationRunner
2. Update ThreadJobFactory to load full job config, execute planning and
write full job config back to coordiantor stream, which was done by
RemoteApplicationRunner
---
.../apache/samza/job/local/ProcessJobFactory.scala | 47 +++++++++++++++-------
.../apache/samza/job/local/ThreadJobFactory.scala | 38 ++++++++++++-----
2 files changed, 61 insertions(+), 24 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 36f1457..ca82892 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,25 +22,48 @@ package org.apache.samza.job.local
import java.util
import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
import org.apache.samza.config.{Config, JobConfig, TaskConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
import org.apache.samza.job.model.JobModelUtil
import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob,
StreamJobFactory}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.startpoint.StartpointManager
import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil,
DiagnosticsUtil, Logging, ReflectionUtil}
import scala.collection.JavaConversions._
/**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
- def getJob(config: Config): StreamJob = {
+ def getJob(submissionConfig: Config): StreamJob = {
+ var config = submissionConfig
+
+ if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+ val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+ // Execute planning
+ val planner = new
RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
originalConfig))
+ val jobConfigs = planner.prepareJobs
+
+ if (jobConfigs.size != 1) {
+ throw new SamzaException("Only single process job is supported.")
+ }
+
+ // This is the full job config
+ config = jobConfigs.get(0)
+ // This needs to be consistent with RemoteApplicationRunner#run where
JobRunner#submit to be called instead of JobRunner#run
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+ DiagnosticsUtil.createDiagnosticsStream(config)
+ }
+
val containerCount = new JobConfig(config).getContainerCount
if (containerCount > 1) {
@@ -51,15 +74,11 @@ class ProcessJobFactory extends StreamJobFactory with
Logging {
val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
coordinatorStreamStore.init()
- val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
-
- val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(),
- coordinatorStreamStore, metricsRegistry)
- val jobModel = coordinator.jobModel
-
+ val jobModelManager = JobModelManager(config,
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore,
metricsRegistry)
+ val jobModel = jobModelManager.jobModel
val taskPartitionMappings: util.Map[TaskName, Integer] = new
util.HashMap[TaskName, Integer]
+
for (containerModel <- jobModel.getContainers.values) {
for (taskModel <- containerModel.getTasks.values) {
taskPartitionMappings.put(taskModel.getTaskName,
taskModel.getChangelogPartition.getPartitionId)
@@ -86,14 +105,14 @@ class ProcessJobFactory extends StreamJobFactory with
Logging {
info("Using command builder class %s" format commandBuilderClass)
val commandBuilder = ReflectionUtil.getObj(commandBuilderClass,
classOf[CommandBuilder])
- // JobCoordinator is stopped by ProcessJob when it exits
- coordinator.start
+ // Start JobModelManager which will be stopped by ProcessJob when it exits
+ jobModelManager.start
commandBuilder
.setConfig(config)
.setId("0")
- .setUrl(coordinator.server.getUrl)
+ .setUrl(jobModelManager.server.getUrl)
- new ProcessJob(commandBuilder, coordinator)
+ new ProcessJob(commandBuilder, jobModelManager)
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 068499c..deea95a 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,16 +19,18 @@
package org.apache.samza.job.local
+import org.apache.samza.SamzaException
import org.apache.samza.application.ApplicationUtil
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
import org.apache.samza.config.JobConfig._
import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig, TaskConfig}
+import org.apache.samza.config.{Config, JobConfig}
import org.apache.samza.container.{SamzaContainer, SamzaContainerListener,
TaskName}
import org.apache.samza.context.{ExternalContext, JobContextImpl}
-import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore,
NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
import org.apache.samza.job.model.JobModelUtil
import org.apache.samza.job.{StreamJob, StreamJobFactory}
import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap,
MetricsReporter}
@@ -36,7 +38,7 @@ import org.apache.samza.runtime.ProcessorContext
import org.apache.samza.startpoint.StartpointManager
import org.apache.samza.storage.ChangelogStreamManager
import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil,
DiagnosticsUtil, Logging}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -45,20 +47,36 @@ import scala.collection.mutable
* Creates a new Thread job with the given config
*/
class ThreadJobFactory extends StreamJobFactory with Logging {
- def getJob(config: Config): StreamJob = {
+ def getJob(submissionConfig: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
+ var config = submissionConfig
+ if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+ val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+ // Execute planning
+ val planner = new
RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig),
originalConfig))
+ val jobConfigs = planner.prepareJobs
+
+ if (jobConfigs.size != 1) {
+ throw new SamzaException("Only single stage job is supported.")
+ }
+
+ // This is the full job config
+ config = jobConfigs.get(0)
+ // This needs to be consistent with RemoteApplicationRunner#run where
JobRunner#submit to be called instead of JobRunner#run
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+ DiagnosticsUtil.createDiagnosticsStream(config)
+ }
val metricsRegistry = new MetricsRegistryMap()
val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
coordinatorStreamStore.init()
- val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
- val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(),
+ val jobModelManager = JobModelManager(config,
changelogStreamManager.readPartitionMapping(),
coordinatorStreamStore, metricsRegistry)
- val jobModel = coordinator.jobModel
+ val jobModel = jobModelManager.jobModel
val taskPartitionMappings: mutable.Map[TaskName, Integer] =
mutable.Map[TaskName, Integer]()
for (containerModel <- jobModel.getContainers.values) {
@@ -122,7 +140,7 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
}
try {
- coordinator.start
+ jobModelManager.start
val container = SamzaContainer(
containerId,
jobModel,
@@ -138,7 +156,7 @@ class ThreadJobFactory extends StreamJobFactory with
Logging {
val threadJob = new ThreadJob(container)
threadJob
} finally {
- coordinator.stop
+ jobModelManager.stop
if (jmxServer != null) {
jmxServer.stop
}