Ensuring the broker service is started before LanguageInfo is requested
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/e5d8d0f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/e5d8d0f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/e5d8d0f0 Branch: refs/heads/master Commit: e5d8d0f01565c7eea99cbacb8c86df078a50e8a0 Parents: b3ef81b Author: Liam Fisk <[email protected]> Authored: Fri Jul 1 10:06:11 2016 +1200 Committer: Marius van Niekerk <[email protected]> Committed: Thu Oct 20 17:50:59 2016 -0400 ---------------------------------------------------------------------- .../pyspark/PySparkInterpreter.scala | 26 ++++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/e5d8d0f0/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala ---------------------------------------------------------------------- diff --git a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala index 632f52d..72b0aa3 100644 --- a/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala +++ b/pyspark-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/pyspark/PySparkInterpreter.scala @@ -89,8 +89,7 @@ class PySparkInterpreter( * execution or the failure */ override def interpret(code: String, silent: Boolean, output: Option[OutputStream]): - (Result, Either[ExecuteOutput, ExecuteFailure]) = - { + (Result, Either[ExecuteOutput, ExecuteFailure]) = { if (!pySparkService.isRunning) pySparkService.start() val futureResult = pySparkTransformer.transformToInterpreterResult( @@ -148,11 +147,22 @@ class PySparkInterpreter( // Unsupported override def doQuietly[T](body: => T): T = ??? - // TODO Identify how to plumb python version to here - override def languageInfo = LanguageInfo( - "python", - pySparkState.getVersion(), - fileExtension = Some(".py"), - pygmentsLexer = Some("ipython2")) + override def languageInfo: LanguageInfo = { + if (!pySparkService.isRunning) pySparkService.start() + import scala.util.control.Breaks._ + val waitLimit = System.currentTimeMillis() + java.util.concurrent.TimeUnit.SECONDS.toMillis(5) + while (!pySparkState.isReady) { + if (System.currentTimeMillis > waitLimit) { + logger.warn("Timed out waiting for broker state to become ready") + break + } + } + + LanguageInfo( + "python", + pySparkState.getVersion(), + fileExtension = Some(".py"), + pygmentsLexer = Some("ipython2")) + } }
