This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d788447  SAMZA-2167: Should not close the MetadataStore after 
generating JobModel in ProcessJobFactory. (#998)
d788447 is described below

commit d78844753c4aeb37883fa37a57351787d50f1fe3
Author: shanthoosh <[email protected]>
AuthorDate: Wed Apr 17 13:26:04 2019 -0700

    SAMZA-2167: Should not close the MetadataStore after generating JobModel in 
ProcessJobFactory. (#998)
---
 .../apache/samza/job/local/ProcessJobFactory.scala | 88 +++++++++++-----------
 1 file changed, 42 insertions(+), 46 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 40884d6..995022b 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -50,60 +50,56 @@ class ProcessJobFactory extends StreamJobFactory with 
Logging {
     val coordinatorStreamStore: CoordinatorStreamStore = new 
CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    try {
-      val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
+    val configFromCoordinatorStream: Config = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
 
-      val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
+    val changelogStreamManager = new ChangelogStreamManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetChangelogMapping.TYPE))
 
-      val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
-      val jobModel = coordinator.jobModel
+    val coordinator = JobModelManager(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, 
metricsRegistry)
+    val jobModel = coordinator.jobModel
 
-      val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
-      for (containerModel <- jobModel.getContainers.values) {
-        for (taskModel <- containerModel.getTasks.values) {
-          taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
-        }
+    val taskPartitionMappings: util.Map[TaskName, Integer] = new 
util.HashMap[TaskName, Integer]
+    for (containerModel <- jobModel.getContainers.values) {
+      for (taskModel <- containerModel.getTasks.values) {
+        taskPartitionMappings.put(taskModel.getTaskName, 
taskModel.getChangelogPartition.getPartitionId)
       }
+    }
 
-      changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
-      //create necessary checkpoint and changelog streams
-      val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
-      if (checkpointManager != null) {
-        checkpointManager.createResources()
-      }
-      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
-
-      val containerModel = coordinator.jobModel.getContainers.get(0)
-
-      val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
-      info("Process job. using fwkPath = " + fwkPath)
-
-      val commandBuilder = {
-        config.getCommandClass match {
-          case Some(cmdBuilderClassName) => {
-            // A command class was specified, so we need to use a process job 
to
-            // execute the command in its own process.
-            Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
-          }
-          case _ => {
-            info("Defaulting to ShellCommandBuilder")
-            new ShellCommandBuilder
-          }
+    //create necessary checkpoint and changelog streams
+    val checkpointManager = new 
TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+    if (checkpointManager != null) {
+      checkpointManager.createResources()
+    }
+    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
+
+    val containerModel = coordinator.jobModel.getContainers.get(0)
+
+    val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is 
configured
+    info("Process job. using fwkPath = " + fwkPath)
+
+    val commandBuilder = {
+      config.getCommandClass match {
+        case Some(cmdBuilderClassName) => {
+          // A command class was specified, so we need to use a process job to
+          // execute the command in its own process.
+          Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
+        }
+        case _ => {
+          info("Defaulting to ShellCommandBuilder")
+          new ShellCommandBuilder
         }
       }
-      // JobCoordinator is stopped by ProcessJob when it exits
-      coordinator.start
-
-      commandBuilder
-        .setConfig(config)
-        .setId("0")
-        .setUrl(coordinator.server.getUrl)
-        .setCommandPath(fwkPath)
-
-      new ProcessJob(commandBuilder, coordinator)
-    } finally {
-      coordinatorStreamStore.close()
     }
+    // JobCoordinator is stopped by ProcessJob when it exits
+    coordinator.start
+
+    commandBuilder
+      .setConfig(config)
+      .setId("0")
+      .setUrl(coordinator.server.getUrl)
+      .setCommandPath(fwkPath)
+
+    new ProcessJob(commandBuilder, coordinator)
   }
 }

Reply via email to