[FLINK-2761] [scala-shell] Prevent creation of new environment in Scala Shell
This closes #1180 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16afb8ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16afb8ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16afb8ec Branch: refs/heads/master Commit: 16afb8ec66a2a07733b9090bffe96af1e913bb63 Parents: 0a8df6d Author: Sachin Goel <[email protected]> Authored: Fri Sep 25 13:43:45 2015 +0530 Committer: Stephan Ewen <[email protected]> Committed: Tue Sep 29 12:24:54 2015 +0200 ---------------------------------------------------------------------- .../flink/api/java/ScalaShellRemoteEnvironment.java | 11 +++++++++++ .../org.apache.flink/api/scala/FlinkILoop.scala | 1 + .../apache/flink/api/scala/ScalaShellITSuite.scala | 16 +++++++++++++--- 3 files changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java index a7dc708..859c686 100644 --- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -84,4 +84,15 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment { executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor.executePlan(p); } + + public void setAsContext() { + ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { + @Override + public ExecutionEnvironment createExecutionEnvironment() { + throw new UnsupportedOperationException("Execution Environment is already defined" + + " for this shell."); + } + }; + initializeContextEnvironment(factory); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala index 2797e4b..1e96ba3 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala @@ -54,6 +54,7 @@ class FlinkILoop( // remote environment private val remoteEnv: ScalaShellRemoteEnvironment = { val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) + remoteEnv.setAsContext() remoteEnv } http://git-wip-us.apache.org/repos/asf/flink/blob/16afb8ec/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala index e932cd2..7648c50 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala @@ -33,6 +33,19 @@ import scala.tools.nsc.Settings @RunWith(classOf[JUnitRunner]) class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { + test("Prevent re-creation of environment") { + + val input: String = + """ + val env = ExecutionEnvironment.getExecutionEnvironment + """.stripMargin + + val output: String = processInShell(input) + + output should include("UnsupportedOperationException: Execution Environment is already " + + "defined for this shell") + } + test("Iteration test with iterative Pi example") { val input: String = @@ -224,9 +237,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { false, false) - val clusterEnvironment = new TestEnvironment(cl, parallelism) - clusterEnvironment.setAsContext() - cluster = Some(cl) }
