This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d788447 SAMZA-2167: Should not close the MetadataStore after
generating JobModel in ProcessJobFactory. (#998)
d788447 is described below
commit d78844753c4aeb37883fa37a57351787d50f1fe3
Author: shanthoosh <[email protected]>
AuthorDate: Wed Apr 17 13:26:04 2019 -0700
SAMZA-2167: Should not close the MetadataStore after generating JobModel in
ProcessJobFactory. (#998)
---
.../apache/samza/job/local/ProcessJobFactory.scala | 88 +++++++++++-----------
1 file changed, 42 insertions(+), 46 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 40884d6..995022b 100644
---
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -50,60 +50,56 @@ class ProcessJobFactory extends StreamJobFactory with
Logging {
val coordinatorStreamStore: CoordinatorStreamStore = new
CoordinatorStreamStore(config, new MetricsRegistryMap())
coordinatorStreamStore.init()
- try {
- val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+ val configFromCoordinatorStream: Config =
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
- val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
+ val changelogStreamManager = new ChangelogStreamManager(new
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
SetChangelogMapping.TYPE))
- val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore,
metricsRegistry)
- val jobModel = coordinator.jobModel
+ val coordinator = JobModelManager(configFromCoordinatorStream,
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore,
metricsRegistry)
+ val jobModel = coordinator.jobModel
- val taskPartitionMappings: util.Map[TaskName, Integer] = new
util.HashMap[TaskName, Integer]
- for (containerModel <- jobModel.getContainers.values) {
- for (taskModel <- containerModel.getTasks.values) {
- taskPartitionMappings.put(taskModel.getTaskName,
taskModel.getChangelogPartition.getPartitionId)
- }
+ val taskPartitionMappings: util.Map[TaskName, Integer] = new
util.HashMap[TaskName, Integer]
+ for (containerModel <- jobModel.getContainers.values) {
+ for (taskModel <- containerModel.getTasks.values) {
+ taskPartitionMappings.put(taskModel.getTaskName,
taskModel.getChangelogPartition.getPartitionId)
}
+ }
- changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+ changelogStreamManager.writePartitionMapping(taskPartitionMappings)
- //create necessary checkpoint and changelog streams
- val checkpointManager = new
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
- if (checkpointManager != null) {
- checkpointManager.createResources()
- }
- ChangelogStreamManager.createChangelogStreams(jobModel.getConfig,
jobModel.maxChangeLogStreamPartitions)
-
- val containerModel = coordinator.jobModel.getContainers.get(0)
-
- val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is
configured
- info("Process job. using fwkPath = " + fwkPath)
-
- val commandBuilder = {
- config.getCommandClass match {
- case Some(cmdBuilderClassName) => {
- // A command class was specified, so we need to use a process job
to
- // execute the command in its own process.
- Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
- }
- case _ => {
- info("Defaulting to ShellCommandBuilder")
- new ShellCommandBuilder
- }
+ //create necessary checkpoint and changelog streams
+ val checkpointManager = new
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+ if (checkpointManager != null) {
+ checkpointManager.createResources()
+ }
+ ChangelogStreamManager.createChangelogStreams(jobModel.getConfig,
jobModel.maxChangeLogStreamPartitions)
+
+ val containerModel = coordinator.jobModel.getContainers.get(0)
+
+ val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is
configured
+ info("Process job. using fwkPath = " + fwkPath)
+
+ val commandBuilder = {
+ config.getCommandClass match {
+ case Some(cmdBuilderClassName) => {
+ // A command class was specified, so we need to use a process job to
+ // execute the command in its own process.
+ Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
+ }
+ case _ => {
+ info("Defaulting to ShellCommandBuilder")
+ new ShellCommandBuilder
}
}
- // JobCoordinator is stopped by ProcessJob when it exits
- coordinator.start
-
- commandBuilder
- .setConfig(config)
- .setId("0")
- .setUrl(coordinator.server.getUrl)
- .setCommandPath(fwkPath)
-
- new ProcessJob(commandBuilder, coordinator)
- } finally {
- coordinatorStreamStore.close()
}
+ // JobCoordinator is stopped by ProcessJob when it exits
+ coordinator.start
+
+ commandBuilder
+ .setConfig(config)
+ .setId("0")
+ .setUrl(coordinator.server.getUrl)
+ .setCommandPath(fwkPath)
+
+ new ProcessJob(commandBuilder, coordinator)
}
}