[PIO-120] ensures the train process exits gracefully after an Elasticsearch connection error
Closes #432 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/34518365 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/34518365 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/34518365 Branch: refs/heads/master Commit: 3451836584e16f324dc90f973846618e6a5bc3c9 Parents: a0a2c12 Author: Mars Hall <[email protected]> Authored: Mon Sep 11 17:13:26 2017 -0700 Committer: Mars Hall <[email protected]> Committed: Mon Sep 11 17:13:26 2017 -0700 ---------------------------------------------------------------------- .../predictionio/workflow/CreateWorkflow.scala | 238 ++++++++++--------- 1 file changed, 121 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/34518365/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala index 303ed06..c5f32a6 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala @@ -134,144 +134,148 @@ object CreateWorkflow extends Logging { } def main(args: Array[String]): Unit = { - val wfcOpt = parser.parse(args, WorkflowConfig()) - if (wfcOpt.isEmpty) { - logger.error("WorkflowConfig is empty. Quitting") - return - } + try { + val wfcOpt = parser.parse(args, WorkflowConfig()) + if (wfcOpt.isEmpty) { + logger.error("WorkflowConfig is empty. Quitting") + return + } - val wfc = wfcOpt.get + val wfc = wfcOpt.get - WorkflowUtils.modifyLogging(wfc.verbose) + WorkflowUtils.modifyLogging(wfc.verbose) - val evaluation = wfc.evaluationClass.map { ec => - try { - WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2 - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain evaluation $ec. Aborting workflow.", e) - sys.exit(1) + val evaluation = wfc.evaluationClass.map { ec => + try { + WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2 + } catch { + case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => + error(s"Unable to obtain evaluation $ec. Aborting workflow.", e) + sys.exit(1) + } } - } - val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg => - try { - WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2 - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain engine parameters generator $epg. " + - "Aborting workflow.", e) - sys.exit(1) + val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg => + try { + WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2 + } catch { + case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => + error(s"Unable to obtain engine parameters generator $epg. " + + "Aborting workflow.", e) + sys.exit(1) + } } - } - val pioEnvVars = wfc.env.map { e => - e.split(',').flatMap { p => - p.split('=') match { - case Array(k, v) => List(k -> v) - case _ => Nil - } - }.toMap - }.getOrElse(Map.empty) + val pioEnvVars = wfc.env.map { e => + e.split(',').flatMap { p => + p.split('=') match { + case Array(k, v) => List(k -> v) + case _ => Nil + } + }.toMap + }.getOrElse(Map.empty) - if (evaluation.isEmpty) { - val variantJson = parse(stringFromFile(wfc.engineVariant)) - val engineFactory = if (wfc.engineFactory == "") { - variantJson \ "engineFactory" match { + if (evaluation.isEmpty) { + val variantJson = parse(stringFromFile(wfc.engineVariant)) + val engineFactory = if (wfc.engineFactory == "") { + variantJson \ "engineFactory" match { + case JString(s) => s + case _ => + error("Unable to read engine factory class name from " + + s"${wfc.engineVariant}. Aborting.") + sys.exit(1) + } + } else wfc.engineFactory + val variantId = variantJson \ "id" match { case JString(s) => s case _ => - error("Unable to read engine factory class name from " + + error("Unable to read engine variant ID from " + s"${wfc.engineVariant}. Aborting.") sys.exit(1) } - } else wfc.engineFactory - val variantId = variantJson \ "id" match { - case JString(s) => s - case _ => - error("Unable to read engine variant ID from " + - s"${wfc.engineVariant}. Aborting.") - sys.exit(1) - } - val (engineLanguage, engineFactoryObj) = try { - WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader) - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.") - sys.exit(1) - } + val (engineLanguage, engineFactoryObj) = try { + WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader) + } catch { + case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => + error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.") + sys.exit(1) + } - val engine: BaseEngine[_, _, _, _] = engineFactoryObj() + val engine: BaseEngine[_, _, _, _] = engineFactoryObj() - val customSparkConf = WorkflowUtils.extractSparkConf(variantJson) - val workflowParams = WorkflowParams( - verbose = wfc.verbosity, - skipSanityCheck = wfc.skipSanityCheck, - stopAfterRead = wfc.stopAfterRead, - stopAfterPrepare = wfc.stopAfterPrepare, - sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf) + val customSparkConf = WorkflowUtils.extractSparkConf(variantJson) + val workflowParams = WorkflowParams( + verbose = wfc.verbosity, + skipSanityCheck = wfc.skipSanityCheck, + stopAfterRead = wfc.stopAfterRead, + stopAfterPrepare = wfc.stopAfterPrepare, + sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf) - // Evaluator Not Specified. Do training. - if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) { - throw new NoSuchMethodException(s"Engine $engine is not trainable") - } + // Evaluator Not Specified. Do training. + if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) { + throw new NoSuchMethodException(s"Engine $engine is not trainable") + } - val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]] + val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]] - val engineParams = if (wfc.engineParamsKey == "") { - trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor) - } else { - engineFactoryObj.engineParams(wfc.engineParamsKey) - } + val engineParams = if (wfc.engineParamsKey == "") { + trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor) + } else { + engineFactoryObj.engineParams(wfc.engineParamsKey) + } - val engineInstance = EngineInstance( - id = "", - status = "INIT", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = wfc.engineId, - engineVersion = wfc.engineVersion, - engineVariant = variantId, - engineFactory = engineFactory, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv, - dataSourceParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), - preparatorParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), - algorithmsParams = - JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), - servingParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) + val engineInstance = EngineInstance( + id = "", + status = "INIT", + startTime = DateTime.now, + endTime = DateTime.now, + engineId = wfc.engineId, + engineVersion = wfc.engineVersion, + engineVariant = variantId, + engineFactory = engineFactory, + batch = wfc.batch, + env = pioEnvVars, + sparkConf = workflowParams.sparkEnv, + dataSourceParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), + preparatorParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), + algorithmsParams = + JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), + servingParams = + JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) - val engineInstanceId = Storage.getMetaDataEngineInstances.insert( - engineInstance) + val engineInstanceId = Storage.getMetaDataEngineInstances.insert( + engineInstance) - CoreWorkflow.runTrain( - env = pioEnvVars, - params = workflowParams, - engine = trainableEngine, - engineParams = engineParams, - engineInstance = engineInstance.copy(id = engineInstanceId)) - } else { - val workflowParams = WorkflowParams( - verbose = wfc.verbosity, - skipSanityCheck = wfc.skipSanityCheck, - stopAfterRead = wfc.stopAfterRead, - stopAfterPrepare = wfc.stopAfterPrepare, - sparkEnv = WorkflowParams().sparkEnv) - val evaluationInstance = EvaluationInstance( - evaluationClass = wfc.evaluationClass.get, - engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv - ) - Workflow.runEvaluation( - evaluation = evaluation.get, - engineParamsGenerator = engineParamsGenerator.get, - evaluationInstance = evaluationInstance, - params = workflowParams) + CoreWorkflow.runTrain( + env = pioEnvVars, + params = workflowParams, + engine = trainableEngine, + engineParams = engineParams, + engineInstance = engineInstance.copy(id = engineInstanceId)) + } else { + val workflowParams = WorkflowParams( + verbose = wfc.verbosity, + skipSanityCheck = wfc.skipSanityCheck, + stopAfterRead = wfc.stopAfterRead, + stopAfterPrepare = wfc.stopAfterPrepare, + sparkEnv = WorkflowParams().sparkEnv) + val evaluationInstance = EvaluationInstance( + evaluationClass = wfc.evaluationClass.get, + engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get, + batch = wfc.batch, + env = pioEnvVars, + sparkConf = workflowParams.sparkEnv + ) + Workflow.runEvaluation( + evaluation = evaluation.get, + engineParamsGenerator = engineParamsGenerator.get, + evaluationInstance = evaluationInstance, + params = workflowParams) + } + } finally { + CleanupFunctions.run() } } }
