[FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell. Update Scala 2.11 version and jline dependency.
This closes #1197 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d62033c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d62033c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d62033c Branch: refs/heads/master Commit: 8d62033c23f50ac1c8ccca04b70c4fda1b8ba46c Parents: a437a2b Author: Chiwan Park <[email protected]> Authored: Sat Sep 26 02:47:06 2015 +0900 Committer: Fabian Hueske <[email protected]> Committed: Wed Oct 7 12:57:40 2015 +0200 ---------------------------------------------------------------------- flink-dist/pom.xml | 29 +-- flink-staging/flink-scala-shell/pom.xml | 13 +- .../apache/flink/api/scala/ILoopCompat.scala | 29 +++ .../apache/flink/api/scala/ILoopCompat.scala | 31 +++ .../org.apache.flink/api/scala/FlinkILoop.scala | 218 ------------------ .../org.apache.flink/api/scala/FlinkShell.scala | 108 --------- .../org/apache/flink/api/scala/FlinkILoop.scala | 224 +++++++++++++++++++ .../org/apache/flink/api/scala/FlinkShell.scala | 107 +++++++++ .../flink/api/scala/ScalaShellITSuite.scala | 2 +- flink-staging/pom.xml | 18 +- pom.xml | 2 +- 11 files changed, 407 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 32059ea..f1745ed 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -125,34 +125,17 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala-shell</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> <!-- See main pom.xml for explanation of profiles --> <profiles> <profile> - <id>scala-2.10</id> - <activation> - - <property> - <!-- this is the default scala profile --> - <name>!scala-2.11</name> - </property> - </activation> - - <properties> - <scala.version>2.10.4</scala.version> - <scala.binary.version>2.10</scala.binary.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala-shell</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - </profile> - <profile> <id>include-yarn</id> <activation> <property> http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml index 94718a3..5adb8c6 100644 --- a/flink-staging/flink-scala-shell/pom.xml +++ b/flink-staging/flink-scala-shell/pom.xml @@ -76,12 +76,6 @@ under the License. <version>${scala.version}</version> </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>jline</artifactId> - <version>2.10.4</version> - </dependency> - <!-- tests --> <dependency> <groupId>org.apache.flink</groupId> @@ -180,6 +174,7 @@ under the License. <configuration> <sources> <source>src/main/scala</source> + <source>src/main/scala-${scala.binary.version}</source> </sources> </configuration> </execution> @@ -274,6 +269,12 @@ under the License. <artifactId>quasiquotes_${scala.binary.version}</artifactId> <version>${scala.macros.version}</version> </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>jline</artifactId> + <version>2.10.4</version> + </dependency> </dependencies> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000..797b420 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { +} http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000..c1be6db --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { + + protected def addThunk(f: => Unit): Unit = f +} http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala deleted file mode 100644 index cd8a846..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io.{BufferedReader, File, FileOutputStream} - -import scala.tools.nsc.interpreter._ - -import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} -import org.apache.flink.util.AbstractID - - -class FlinkILoop( - val host: String, - val port: Int, - val externalJars: Option[Array[String]], - in0: Option[BufferedReader], - out0: JPrintWriter) - extends ILoop(in0, out0) { - - - - def this(host:String, - port:Int, - externalJars : Option[Array[String]], - in0: BufferedReader, - out: JPrintWriter){ - this(host:String, port:Int, externalJars, Some(in0), out) - } - - def this(host:String, port:Int, externalJars : Option[Array[String]]){ - this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true)) - } - - def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ - this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) - } - // remote environment - private val remoteEnv: ScalaShellRemoteEnvironment = { - // allow creation of environments - ScalaShellRemoteEnvironment.resetContextEnvironments() - - // create our environment that submits against the cluster (local or remote) - val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) - - // prevent further instantiation of environments - ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() - - remoteEnv - } - - // local environment - val scalaEnv: ExecutionEnvironment = { - val scalaEnv = new ExecutionEnvironment(remoteEnv) - scalaEnv - } - - addThunk { - intp.beQuietDuring { - // automatically imports the flink scala api - intp.addImports("org.apache.flink.api.scala._") - intp.addImports("org.apache.flink.api.common.functions._") - // with this we can access this object in the scala shell - intp.bindValue("env", this.scalaEnv) - } - } - - - /** - * creates a temporary directory to store compiled console files - */ - private val tmpDirBase: File = { - // get unique temporary folder: - val abstractID: String = new AbstractID().toString - val tmpDir: File = new File( - System.getProperty("java.io.tmpdir"), - "scala_shell_tmp-" + abstractID) - if (!tmpDir.exists) { - tmpDir.mkdir - } - tmpDir - } - - // scala_shell commands - private val tmpDirShell: File = { - new File(tmpDirBase, "scala_shell_commands") - } - - // scala shell jar file name - private val tmpJarShell: File = { - new File(tmpDirBase, "scala_shell_commands.jar") - } - - - /** - * Packages the compiled classes of the current shell session into a Jar file for execution - * on a Flink cluster. - * - * @return The path of the created Jar file - */ - def writeFilesToDisk(): File = { - val vd = intp.virtualDirectory - - val vdIt = vd.iterator - - for (fi <- vdIt) { - if (fi.isDirectory) { - - val fiIt = fi.iterator - - for (f <- fiIt) { - - // directory for compiled line - val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name) - lineDir.mkdirs() - - // compiled classes for commands from shell - val writeFile = new File(lineDir.getAbsolutePath, f.name) - val outputStream = new FileOutputStream(writeFile) - val inputStream = f.input - - // copy file contents - org.apache.commons.io.IOUtils.copy(inputStream, outputStream) - - inputStream.close() - outputStream.close() - } - } - } - - val compiledClasses = new File(tmpDirShell.getAbsolutePath) - - val jarFilePath = new File(tmpJarShell.getAbsolutePath) - - val jh: JarHelper = new JarHelper - jh.jarDir(compiledClasses, jarFilePath) - - jarFilePath - } - - /** - * CUSTOM START METHODS OVERRIDE: - */ - override def prompt = "Scala-Flink> " - - /** - * custom welcome message - */ - override def printWelcome() { - echo( - // scalastyle:off - """ - \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592 - \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592 - \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592 - \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588 - \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592 - \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588 - \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588 - \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592 - \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592 - \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591 - \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592 - \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592 - \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592 - \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591 - \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593 - \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592 - \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592 - \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588 - \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588 -\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593 -\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593 -\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593 -\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592 - \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592 - \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593 - \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591 - \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588 - \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593 - \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588 - \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593 - \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592 - \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591 - - F L I N K - S C A L A - S H E L L - -NOTE: Use the prebound Execution Environment "env" to read data and execute your program: - * env.readTextFile("/path/to/data") - * env.execute("Program name") - -HINT: You can use print() on a DataSet to print the contents to this shell. - """ - // scalastyle:on - ) - - } - - def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String]) - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala deleted file mode 100644 index a4fae91..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - - -import scala.tools.nsc.Settings - -import org.apache.flink.configuration.Configuration -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster - - -object FlinkShell { - - def main(args: Array[String]) { - - // scopt, command line arguments - case class Config( - port: Int = -1, - host: String = "none", - externalJars: Option[Array[String]] = None) - val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { - head ("Flink Scala Shell") - - opt[Int] ('p', "port") action { - (x, c) => - c.copy (port = x) - } text("port specifies port of running JobManager") - - opt[(String)] ('h',"host") action { - case (x, c) => - c.copy (host = x) - } text("host specifies host name of running JobManager") - - opt[(String)] ('a',"addclasspath") action { - case (x,c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - - help("help") text("prints this usage text") - } - - - // parse arguments - parser.parse (args, Config () ) match { - case Some(config) => - startShell(config.host,config.port,config.externalJars) - - case _ => println("Could not parse program arguments") - } - } - - - def startShell( - userHost : String, - userPort : Int, - externalJars : Option[Array[String]] = None): Unit ={ - - println("Starting Flink Shell:") - - var cluster: LocalFlinkMiniCluster = null - - // either port or userhost not specified by user, create new minicluster - val (host,port) = if (userHost == "none" || userPort == -1 ) { - println("Creating new local server") - cluster = new LocalFlinkMiniCluster(new Configuration, false) - cluster.start() - ("localhost",cluster.getLeaderRPCPort) - } else { - println(s"Connecting to remote server (host: $userHost, port: $userPort).") - (userHost, userPort) - } - - // custom shell - val repl = new FlinkILoop(host, port, externalJars) //new MyILoop(); - - repl.settings = new Settings() - - repl.settings.usejavacp.value = true - - // start scala interpreter shell - repl.process(repl.settings) - - //repl.closeInterpreter() - - if (cluster != null) { - cluster.stop() - } - - println(" good bye ..") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala new file mode 100644 index 0000000..bcf9bc2 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.{BufferedReader, File, FileOutputStream} + +import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} +import org.apache.flink.util.AbstractID + +import scala.tools.nsc.interpreter._ + + +class FlinkILoop( + val host: String, + val port: Int, + val externalJars: Option[Array[String]], + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoopCompat(in0, out0) { + + def this(host:String, + port:Int, + externalJars : Option[Array[String]], + in0: BufferedReader, + out: JPrintWriter){ + this(host:String, port:Int, externalJars, Some(in0), out) + } + + def this(host:String, port:Int, externalJars : Option[Array[String]]){ + this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true)) + } + + def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ + this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) + } + + // remote environment + private val remoteEnv: ScalaShellRemoteEnvironment = { + // allow creation of environments + ScalaShellRemoteEnvironment.resetContextEnvironments() + + // create our environment that submits against the cluster (local or remote) + val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) + + // prevent further instantiation of environments + ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() + + remoteEnv + } + + // local environment + val scalaEnv: ExecutionEnvironment = { + val scalaEnv = new ExecutionEnvironment(remoteEnv) + scalaEnv + } + + /** + * creates a temporary directory to store compiled console files + */ + private val tmpDirBase: File = { + // get unique temporary folder: + val abstractID: String = new AbstractID().toString + val tmpDir: File = new File( + System.getProperty("java.io.tmpdir"), + "scala_shell_tmp-" + abstractID) + if (!tmpDir.exists) { + tmpDir.mkdir + } + tmpDir + } + + // scala_shell commands + private val tmpDirShell: File = { + new File(tmpDirBase, "scala_shell_commands") + } + + // scala shell jar file name + private val tmpJarShell: File = { + new File(tmpDirBase, "scala_shell_commands.jar") + } + + private val packageImports = Seq[String]( + "org.apache.flink.api.scala._", + "org.apache.flink.api.common.functions._" + ) + + override def createInterpreter(): Unit = { + super.createInterpreter() + + addThunk { + intp.beQuietDuring { + // import dependencies + intp.interpret("import " + packageImports.mkString(", ")) + + // set execution environment + intp.bind("env", this.scalaEnv) + } + } + } + + /** + * Packages the compiled classes of the current shell session into a Jar file for execution + * on a Flink cluster. + * + * @return The path of the created Jar file + */ + def writeFilesToDisk(): File = { + val vd = intp.virtualDirectory + + val vdIt = vd.iterator + + for (fi <- vdIt) { + if (fi.isDirectory) { + + val fiIt = fi.iterator + + for (f <- fiIt) { + + // directory for compiled line + val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name) + lineDir.mkdirs() + + // compiled classes for commands from shell + val writeFile = new File(lineDir.getAbsolutePath, f.name) + val outputStream = new FileOutputStream(writeFile) + val inputStream = f.input + + // copy file contents + org.apache.commons.io.IOUtils.copy(inputStream, outputStream) + + inputStream.close() + outputStream.close() + } + } + } + + val compiledClasses = new File(tmpDirShell.getAbsolutePath) + + val jarFilePath = new File(tmpJarShell.getAbsolutePath) + + val jh: JarHelper = new JarHelper + jh.jarDir(compiledClasses, jarFilePath) + + jarFilePath + } + + /** + * CUSTOM START METHODS OVERRIDE: + */ + override def prompt = "Scala-Flink> " + + /** + * custom welcome message + */ + override def printWelcome() { + echo( + // scalastyle:off + """ + \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592 + \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592 + \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592 + \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588 + \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592 + \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588 + \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588 + \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592 + \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592 + \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591 + \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592 + \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592 + \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592 + \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591 + \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593 + \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592 + \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592 + \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588 + \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588 +\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593 +\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593 +\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593 +\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592 + \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592 + \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593 + \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591 + \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588 + \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593 + \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588 + \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593 + \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592 + \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591 + + F L I N K - S C A L A - S H E L L + +NOTE: Use the prebound Execution Environment "env" to read data and execute your program: + * env.readTextFile("/path/to/data") + * env.execute("Program name") + +HINT: You can use print() on a DataSet to print the contents to this shell. + """ + // scalastyle:on + ) + + } + + def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String]) + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala new file mode 100644 index 0000000..224983b --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster + +import scala.tools.nsc.Settings + + +object FlinkShell { + + def main(args: Array[String]) { + + // scopt, command line arguments + case class Config( + port: Int = -1, + host: String = "none", + externalJars: Option[Array[String]] = None) + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + + opt[Int] ('p', "port") action { + (x, c) => + c.copy (port = x) + } text("port specifies port of running JobManager") + + opt[(String)] ('h',"host") action { + case (x, c) => + c.copy (host = x) + } text("host specifies host name of running JobManager") + + opt[(String)] ('a',"addclasspath") action { + case (x,c) => + val xArray = x.split(":") + c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink") + + help("help") text("prints this usage text") + } + + + // parse arguments + parser.parse (args, Config () ) match { + case Some(config) => + startShell(config.host,config.port,config.externalJars) + + case _ => println("Could not parse program arguments") + } + } + + + def startShell( + userHost : String, + userPort : Int, + externalJars : Option[Array[String]] = None): Unit ={ + + println("Starting Flink Shell:") + + var cluster: LocalFlinkMiniCluster = null + + // either port or userhost not specified by user, create new minicluster + val (host,port) = if (userHost == "none" || userPort == -1 ) { + println("Creating new local server") + cluster = new LocalFlinkMiniCluster(new Configuration, false) + cluster.start() + ("localhost",cluster.getLeaderRPCPort) + } else { + println(s"Connecting to remote server (host: $userHost, port: $userPort).") + (userHost, userPort) + } + + // custom shell + val repl = new FlinkILoop(host, port, externalJars) //new MyILoop(); + + repl.settings = new Settings() + + repl.settings.usejavacp.value = true + + // start scala interpreter shell + repl.process(repl.settings) + + //repl.closeInterpreter() + + if (cluster != null) { + cluster.stop() + } + + println(" good bye ..") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala index 7648c50..0621351 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala @@ -22,7 +22,7 @@ import java.io._ import java.util.concurrent.TimeUnit import org.apache.flink.runtime.StreamingMode -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment} +import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/flink-staging/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml index 271d26c..67aec5a 100644 --- a/flink-staging/pom.xml +++ b/flink-staging/pom.xml @@ -46,6 +46,7 @@ under the License. <module>flink-ml</module> <module>flink-language-binding</module> <module>flink-gelly-scala</module> + <module>flink-scala-shell</module> </modules> <!-- See main pom.xml for explanation of profiles --> @@ -71,22 +72,5 @@ under the License. <module>flink-tez</module> </modules> </profile> - <profile> - <id>scala-2.10</id> - <activation> - - <property> - <!-- this is the default scala profile --> - <name>!scala-2.11</name> - </property> - </activation> - <properties> - <scala.version>2.10.4</scala.version> - <scala.binary.version>2.10</scala.binary.version> - </properties> - <modules> - <module>flink-scala-shell</module> - </modules> - </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/8d62033c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2a5ff01..7e90ad6 100644 --- a/pom.xml +++ b/pom.xml @@ -370,7 +370,7 @@ under the License. </property> </activation> <properties> - <scala.version>2.11.4</scala.version> + <scala.version>2.11.7</scala.version> <scala.binary.version>2.11</scala.binary.version> </properties> </profile>
