http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java deleted file mode 100644 index 83fb342..0000000 --- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java; - - -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.FileInputStream; -import java.io.InputStream; - -import java.util.jar.JarOutputStream; -import java.util.jar.JarEntry; -import java.util.jar.JarInputStream; - -/** - * Provides utility services for jarring and unjarring files and directories. - * Note that a given instance of JarHelper is not threadsafe with respect to - * multiple jar operations. - * - * Copied from http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source - * - * @author Patrick Calahan <a href="mailto:p...@bea.com">p...@bea.com</a> - */ -public class JarHelper -{ - // ======================================================================== - // Constants - - private static final int BUFFER_SIZE = 2156; - - // ======================================================================== - // Variables - - private byte[] mBuffer = new byte[BUFFER_SIZE]; - private int mByteCount = 0; - private boolean mVerbose = false; - private String mDestJarName = ""; - - // ======================================================================== - // Constructor - - /** - * Instantiates a new JarHelper. - */ - public JarHelper() {} - - // ======================================================================== - // Public methods - - /** - * Jars a given directory or single file into a JarOutputStream. - */ - public void jarDir(File dirOrFile2Jar, File destJar) - throws IOException { - - if (dirOrFile2Jar == null || destJar == null) - { - throw new IllegalArgumentException(); - } - - mDestJarName = destJar.getCanonicalPath(); - FileOutputStream fout = new FileOutputStream(destJar); - JarOutputStream jout = new JarOutputStream(fout); - //jout.setLevel(0); - try { - jarDir(dirOrFile2Jar, jout, null); - } catch(IOException ioe) { - throw ioe; - } finally { - jout.close(); - fout.close(); - } - } - - /** - * Unjars a given jar file into a given directory. - */ - public void unjarDir(File jarFile, File destDir) throws IOException { - BufferedOutputStream dest = null; - FileInputStream fis = new FileInputStream(jarFile); - unjar(fis, destDir); - } - - /** - * Given an InputStream on a jar file, unjars the contents into the given - * directory. - */ - public void unjar(InputStream in, File destDir) throws IOException { - BufferedOutputStream dest = null; - JarInputStream jis = new JarInputStream(in); - JarEntry entry; - while ((entry = jis.getNextJarEntry()) != null) { - if (entry.isDirectory()) { - File dir = new File(destDir,entry.getName()); - dir.mkdir(); - if (entry.getTime() != -1) {dir.setLastModified(entry.getTime());} - continue; - } - int count; - byte[] data = new byte[ BUFFER_SIZE ]; - File destFile = new File(destDir, entry.getName()); - if (mVerbose) { - System.out.println("unjarring " + destFile + - " from " + entry.getName()); - } - FileOutputStream fos = new FileOutputStream(destFile); - dest = new BufferedOutputStream(fos, BUFFER_SIZE); - try { - while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) { - dest.write(data, 0, count); - } - dest.flush(); - } finally { - dest.close(); - } - if (entry.getTime() != -1) {destFile.setLastModified(entry.getTime());} - } - jis.close(); - } - - public void setVerbose(boolean b) { - mVerbose = b; - } - - // ======================================================================== - // Private methods - - private static final char SEP = '/'; - /** - * Recursively jars up the given path under the given directory. - */ - private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path) - throws IOException { - if (mVerbose) { System.out.println("checking " + dirOrFile2jar);} - if (dirOrFile2jar.isDirectory()) { - String[] dirList = dirOrFile2jar.list(); - String subPath = (path == null) ? "" : (path+dirOrFile2jar.getName()+SEP); - if (path != null) { - JarEntry je = new JarEntry(subPath); - je.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(je); - jos.flush(); - jos.closeEntry(); - } - for (int i = 0; i < dirList.length; i++) { - File f = new File(dirOrFile2jar, dirList[i]); - jarDir(f,jos,subPath); - } - } else { - if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) - { - if (mVerbose) {System.out.println("skipping " + dirOrFile2jar.getPath());} - return; - } - - if (mVerbose) { - System.out.println("adding " + dirOrFile2jar.getPath()); - } - FileInputStream fis = new FileInputStream(dirOrFile2jar); - try { - JarEntry entry = new JarEntry(path+dirOrFile2jar.getName()); - entry.setTime(dirOrFile2jar.lastModified()); - jos.putNextEntry(entry); - while ((mByteCount = fis.read(mBuffer)) != -1) { - jos.write(mBuffer, 0, mByteCount); - if (mVerbose) { System.out.println("wrote " + mByteCount + " bytes");} - } - jos.flush(); - jos.closeEntry(); - } catch (IOException ioe) { - throw ioe; - } finally { - fis.close(); - } - } - } - - // for debugging - public static void main(String[] args) - throws IOException - { - if (args.length < 2) - { - System.err.println("Usage: JarHelper jarname.jar directory"); - return; - } - - JarHelper jarHelper = new JarHelper(); - jarHelper.mVerbose = true; - - File destJar = new File(args[0]); - File dirOrFile2Jar = new File(args[1]); - - jarHelper.jarDir(dirOrFile2Jar, destJar); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java deleted file mode 100644 index a336957..0000000 --- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ /dev/null @@ -1,107 +0,0 @@ - -package org.apache.flink.api.java; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.PlanExecutor; - -import org.apache.flink.api.scala.FlinkILoop; -import org.apache.flink.configuration.Configuration; - -import java.io.File; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -/** - * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference - * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will - * use the reference of the ILoop to write the compiled classes of the current session to - * a Jar file and submit these with the program. - */ -public class ScalaShellRemoteEnvironment extends RemoteEnvironment { - - // reference to Scala Shell, for access to virtual directory - private FlinkILoop flinkILoop; - - /** - * Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop - * - * @param host The host name or address of the master (JobManager), where the program should be executed. - * @param port The port of the master (JobManager), where the program should be executed. - * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called. - */ - public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) { - super(host, port, null, jarFiles, null); - this.flinkILoop = flinkILoop; - } - - /** - * compiles jars from files in the shell virtual directory on the fly, sends and executes it in the remote environment - * - * @param jobName name of the job as string - * @return Result of the computation - * @throws Exception - */ - @Override - public JobExecutionResult execute(String jobName) throws Exception { - Plan p = createProgramPlan(jobName); - - URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL(); - - // get "external jars, and add the shell command jar, pass to executor - List<URL> alljars = new ArrayList<>(); - // get external (library) jars - String[] extJars = this.flinkILoop.getExternalJars(); - - for (String extJar : extJars) { - URL extJarUrl = new File(extJar).getAbsoluteFile().toURI().toURL(); - alljars.add(extJarUrl); - } - - // add shell commands - alljars.add(jarUrl); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, new Configuration(), - alljars.toArray(new URL[alljars.size()]), null); - - executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); - return executor.executePlan(p); - } - - public static void disableAllContextAndOtherEnvironments() { - - // we create a context environment that prevents the instantiation of further - // context environments. at the same time, setting the context environment prevents manual - // creation of local and remote environments - ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { - @Override - public ExecutionEnvironment createExecutionEnvironment() { - throw new UnsupportedOperationException("Execution Environment is already defined" + - " for this shell."); - } - }; - initializeContextEnvironment(factory); - } - - public static void resetContextEnvironments() { - ExecutionEnvironment.resetContextEnvironment(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala deleted file mode 100644 index 7751751..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io.BufferedReader - -import _root_.scala.tools.nsc.interpreter._ - -class ILoopCompat( - in0: Option[BufferedReader], - out0: JPrintWriter) - extends ILoop(in0, out0) { - - override def prompt = "Scala-Flink> " -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala deleted file mode 100644 index 1c395bb..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io.BufferedReader - -import _root_.scala.tools.nsc.interpreter._ -import _root_.scala.io.AnsiColor.{MAGENTA, RESET} - -class ILoopCompat( - in0: Option[BufferedReader], - out0: JPrintWriter) - extends ILoop(in0, out0) { - - override def prompt = { - val promptStr = "Scala-Flink> " - s"$MAGENTA$promptStr$RESET" - } - - protected def addThunk(f: => Unit): Unit = f -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala deleted file mode 100644 index bcc9ef3..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io.{BufferedReader, File, FileOutputStream} - -import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} -import org.apache.flink.util.AbstractID - -import scala.tools.nsc.interpreter._ - - -class FlinkILoop( - val host: String, - val port: Int, - val externalJars: Option[Array[String]], - in0: Option[BufferedReader], - out0: JPrintWriter) - extends ILoopCompat(in0, out0) { - - def this(host: String, - port: Int, - externalJars: Option[Array[String]], - in0: BufferedReader, - out: JPrintWriter){ - this(host: String, port: Int, externalJars, Some(in0), out) - } - - def this(host: String, port: Int, externalJars: Option[Array[String]]){ - this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true)) - } - - def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ - this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) - } - - // remote environment - private val remoteEnv: ScalaShellRemoteEnvironment = { - // allow creation of environments - ScalaShellRemoteEnvironment.resetContextEnvironments() - - // create our environment that submits against the cluster (local or remote) - val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) - - // prevent further instantiation of environments - ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() - - remoteEnv - } - - // local environment - val scalaEnv: ExecutionEnvironment = { - val scalaEnv = new ExecutionEnvironment(remoteEnv) - scalaEnv - } - - /** - * creates a temporary directory to store compiled console files - */ - private val tmpDirBase: File = { - // get unique temporary folder: - val abstractID: String = new AbstractID().toString - val tmpDir: File = new File( - System.getProperty("java.io.tmpdir"), - "scala_shell_tmp-" + abstractID) - if (!tmpDir.exists) { - tmpDir.mkdir - } - tmpDir - } - - // scala_shell commands - private val tmpDirShell: File = { - new File(tmpDirBase, "scala_shell_commands") - } - - // scala shell jar file name - private val tmpJarShell: File = { - new File(tmpDirBase, "scala_shell_commands.jar") - } - - private val packageImports = Seq[String]( - "org.apache.flink.core.fs._", - "org.apache.flink.core.fs.local._", - "org.apache.flink.api.common.io._", - "org.apache.flink.api.common.aggregators._", - "org.apache.flink.api.common.accumulators._", - "org.apache.flink.api.common.distributions._", - "org.apache.flink.api.common.operators._", - "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint", - "org.apache.flink.api.common.functions._", - "org.apache.flink.api.java.io._", - "org.apache.flink.api.java.aggregation._", - "org.apache.flink.api.java.functions._", - "org.apache.flink.api.java.operators._", - "org.apache.flink.api.java.sampling._", - "org.apache.flink.api.scala._", - "org.apache.flink.api.scala.utils._" - ) - - override def createInterpreter(): Unit = { - super.createInterpreter() - - addThunk { - intp.beQuietDuring { - // import dependencies - intp.interpret("import " + packageImports.mkString(", ")) - - // set execution environment - intp.bind("env", this.scalaEnv) - } - } - } - - /** - * Packages the compiled classes of the current shell session into a Jar file for execution - * on a Flink cluster. - * - * @return The path of the created Jar file - */ - def writeFilesToDisk(): File = { - val vd = intp.virtualDirectory - - val vdIt = vd.iterator - - for (fi <- vdIt) { - if (fi.isDirectory) { - - val fiIt = fi.iterator - - for (f <- fiIt) { - - // directory for compiled line - val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name) - lineDir.mkdirs() - - // compiled classes for commands from shell - val writeFile = new File(lineDir.getAbsolutePath, f.name) - val outputStream = new FileOutputStream(writeFile) - val inputStream = f.input - - // copy file contents - org.apache.commons.io.IOUtils.copy(inputStream, outputStream) - - inputStream.close() - outputStream.close() - } - } - } - - val compiledClasses = new File(tmpDirShell.getAbsolutePath) - - val jarFilePath = new File(tmpJarShell.getAbsolutePath) - - val jh: JarHelper = new JarHelper - jh.jarDir(compiledClasses, jarFilePath) - - jarFilePath - } - - /** - * custom welcome message - */ - override def printWelcome() { - echo( - // scalastyle:off - """ - \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592 - \u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592 - \u2593\u2588\u2588\u2588\u2593\u2591\u2591 \u2592\u2592\u2592\u2593\u2588\u2588\u2592 \u2592 - \u2591\u2588\u2588\u2592 \u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591 \u2592\u2588\u2588\u2588\u2588 - \u2588\u2588\u2592 \u2591\u2592\u2593\u2588\u2588\u2588\u2592 \u2592\u2588\u2592\u2588\u2592 - \u2591\u2593\u2588 \u2588\u2588\u2588 \u2593\u2591\u2592\u2588\u2588 - \u2593\u2588 \u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588 - \u2588\u2591 \u2588 \u2592\u2592\u2591 \u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592 - \u2588\u2588\u2588\u2588\u2591 \u2592\u2593\u2588\u2593 \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592 - \u2591\u2592\u2588\u2593\u2593\u2588\u2588 \u2593\u2588\u2592 \u2593\u2588\u2592\u2593\u2588\u2588\u2593 \u2591\u2588\u2591 - \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588 \u2592\u2588 \u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592 - \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593 \u2593\u2588 \u2588 \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592 - \u2591\u2588\u2588\u2593 \u2591\u2588\u2591 \u2588 \u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2588\u2588\u2593\u2591\u2592 - \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591 \u2593 \u2591\u2588 \u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591 \u2591\u2588\u2591\u2593 \u2593\u2591 - \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592 \u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591 \u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593 - \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588 \u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591 \u2588\u2588\u2592\u2592 \u2588 \u2592 \u2593\u2588\u2592 - \u2593\u2588\u2593 \u2593\u2588 \u2588\u2588\u2593 \u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592 \u2592\u2588\u2588\u2593 \u2591\u2588\u2592 - \u2593\u2588 \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593 \u2591\u2592\u2591 \u2593\u2588 - \u2588\u2588\u2593 \u2588\u2588\u2592 \u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592 \u2593\u2588\u2588\u2588 \u2588 -\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588 \u2591\u2593\u2593\u2592\u2591\u2591 \u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591 \u2591\u2592\u2593\u2592 \u2588\u2593 -\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588 \u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591 \u2588\u2593 -\u2588\u2588 \u2593\u2591\u2592\u2588 \u2593\u2593\u2593\u2593\u2592\u2591\u2591 \u2592\u2588\u2593 \u2592\u2593\u2593\u2588\u2588\u2593 \u2593\u2592 \u2592\u2592\u2593 -\u2593\u2588\u2593 \u2593\u2592\u2588 \u2588\u2593\u2591 \u2591\u2592\u2593\u2593\u2588\u2588\u2592 \u2591\u2593\u2588\u2592 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592 - \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588 \u2588\u2591 \u2591\u2591\u2591\u2591 \u2591\u2588\u2592 - \u2593\u2588 \u2592\u2588\u2593 \u2591 \u2588\u2591 \u2592\u2588 \u2588\u2593 - \u2588\u2593 \u2588\u2588 \u2588\u2591 \u2593\u2593 \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591 - \u2588\u2593 \u2591\u2593\u2588\u2588\u2591 \u2593\u2592 \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591 \u2592\u2588 - \u2588\u2588 \u2593\u2588\u2593\u2591 \u2592 \u2591\u2592\u2588\u2592\u2588\u2588\u2592 \u2593\u2593 - \u2593\u2588\u2592 \u2592\u2588\u2593\u2592\u2591 \u2592\u2592 \u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588 - \u2591\u2588\u2588\u2592 \u2592\u2593\u2593\u2592 \u2593\u2588\u2588\u2593\u2592\u2588\u2592 \u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593 - \u2591\u2593\u2588\u2588\u2592 \u2593\u2591 \u2592\u2588\u2593\u2588 \u2591\u2591\u2592\u2592\u2592 - \u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593 \u2593\u2591\u2592\u2588\u2591 - - F L I N K - S C A L A - S H E L L - -NOTE: Use the prebound Execution Environment "env" to read data and execute your program: - * env.readTextFile("/path/to/data") - * env.execute("Program name") - -HINT: You can use print() on a DataSet to print the contents to this shell. - """ - // scalastyle:on - ) - - } - - def getExternalJars(): Array[String] = externalJars.getOrElse(Array.empty[String]) -} - http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala deleted file mode 100644 index eb7f816..0000000 --- a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io.{StringWriter, BufferedReader} - -import org.apache.flink.api.common.ExecutionMode - -import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster - -import scala.tools.nsc.Settings - -import scala.tools.nsc.interpreter._ - - -object FlinkShell { - - object ExecutionMode extends Enumeration { - val UNDEFINED, LOCAL, REMOTE = Value - } - - var bufferedReader: Option[BufferedReader] = None - - def main(args: Array[String]) { - - // scopt, command line arguments - case class Config( - port: Int = -1, - host: String = "none", - externalJars: Option[Array[String]] = None, - flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED) - - val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { - head ("Flink Scala Shell") - - cmd("local") action { - (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) - } text("starts Flink scala shell with a local Flink cluster\n") children( - opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action { - case (x, c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink\n") - ) - - cmd("remote") action { (_, c) => - c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) - } text("starts Flink scala shell connecting to a remote cluster\n") children( - arg[String]("<host>") action { (h, c) => - c.copy(host = h) } - text("remote host name as string"), - arg[Int]("<port>") action { (p, c) => - c.copy(port = p) } - text("remote port as integer\n"), - opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action { - case (x, c) => - val xArray = x.split(":") - c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") - ) - help("help") abbr("h") text("prints this usage text\n") - } - - // parse arguments - parser.parse (args, Config()) match { - case Some(config) => - startShell(config.host, - config.port, - config.flinkShellExecutionMode, - config.externalJars) - - case _ => System.out.println("Could not parse program arguments") - } - } - - - def startShell( - userHost: String, - userPort: Int, - executionMode: ExecutionMode.Value, - externalJars: Option[Array[String]] = None): Unit ={ - - System.out.println("Starting Flink Shell:") - - // either port or userhost not specified by user, create new minicluster - val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) = - executionMode match { - case ExecutionMode.LOCAL => - val config = new Configuration() - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - val miniCluster = new LocalFlinkMiniCluster(config, false) - miniCluster.start() - val port = miniCluster.getLeaderRPCPort - System.out.println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") - ("localhost", port, Some(miniCluster)) - - case ExecutionMode.REMOTE => - if (userHost == "none" || userPort == -1) { - System.out.println("Error: <host> or <port> not specified!") - return - } else { - System.out.println( - s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") - (userHost, userPort, None) - } - - case ExecutionMode.UNDEFINED => - System.out.println("Error: please specify execution mode:") - System.out.println("[local | remote <host> <port>]") - return - } - - var repl: Option[FlinkILoop] = None - - try { - // custom shell - repl = Some( - bufferedReader match { - - case Some(br) => - val out = new StringWriter() - new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out)) - - case None => - new FlinkILoop(host, port, externalJars) - }) - - val settings = new Settings() - - settings.usejavacp.value = true - settings.Yreplsync.value = true - - // start scala interpreter shell - repl.foreach(_.process(settings)) - } finally { - repl.foreach(_.closeInterpreter()) - cluster.foreach(_.stop()) - } - - System.out.println(" good bye ..") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties deleted file mode 100644 index 9912b19..0000000 --- a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,24 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - - -# Convenience file for local debugging of the JobManager/TaskManager. -log4j.rootLogger=OFF, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml deleted file mode 100644 index 8b3bb27..0000000 --- a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala deleted file mode 100644 index 00e36ab..0000000 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io._ -import java.util.concurrent.TimeUnit - -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} -import org.apache.flink.util.TestLogger -import org.junit.{AfterClass, BeforeClass, Test, Assert} - -import scala.concurrent.duration.FiniteDuration -import scala.tools.nsc.Settings - -class ScalaShellITCase extends TestLogger { - - import ScalaShellITCase._ - - /** Prevent re-creation of environment */ - @Test - def testPreventRecreation(): Unit = { - - val input: String = - """ - val env = ExecutionEnvironment.getExecutionEnvironment - """.stripMargin - - val output: String = processInShell(input) - - Assert.assertTrue(output.contains( - "UnsupportedOperationException: Execution Environment is already " + - "defined for this shell")) - } - - /** Iteration test with iterative Pi example */ - @Test - def testIterativePI(): Unit = { - - val input: String = - """ - val initial = env.fromElements(0) - val count = initial.iterate(10000) { iterationInput: DataSet[Int] => - val result = iterationInput.map { i => - val x = Math.random() - val y = Math.random() - i + (if (x * x + y * y < 1) 1 else 0) - } - result - } - val result = count map { c => c / 10000.0 * 4 } - result.collect() - """.stripMargin - - val output: String = processInShell(input) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("error")) - Assert.assertFalse(output.contains("Exception")) - } - - /** WordCount in Shell */ - @Test - def testWordCount(): Unit = { - val input = - """ - val text = env.fromElements("To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,") - val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1) - val result = counts.print() - """.stripMargin - - val output = processInShell(input) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("error")) - Assert.assertFalse(output.contains("Exception")) - - // some of the words that should be included - Assert.assertTrue(output.contains("(a,1)")) - Assert.assertTrue(output.contains("(whether,1)")) - Assert.assertTrue(output.contains("(to,4)")) - Assert.assertTrue(output.contains("(arrows,1)")) - } - - /** Sum 1..10, should be 55 */ - @Test - def testSum: Unit = { - val input = - """ - val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10) - val reduced = input.reduce(_+_) - reduced.print - """.stripMargin - - val output = processInShell(input) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("error")) - Assert.assertFalse(output.contains("Exception")) - - Assert.assertTrue(output.contains("55")) - } - - /** WordCount in Shell with custom case class */ - @Test - def testWordCountWithCustomCaseClass: Unit = { - val input = - """ - case class WC(word: String, count: Int) - val wordCounts = env.fromElements( - new WC("hello", 1), - new WC("world", 2), - new WC("world", 8)) - val reduced = wordCounts.groupBy(0).sum(1) - reduced.print() - """.stripMargin - - val output = processInShell(input) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("error")) - Assert.assertFalse(output.contains("Exception")) - - Assert.assertTrue(output.contains("WC(hello,1)")) - Assert.assertTrue(output.contains("WC(world,10)")) - } - - /** Submit external library */ - @Test - def testSubmissionOfExternalLibrary: Unit = { - val input = - """ - import org.apache.flink.ml.math._ - val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0)) - denseVectors.print() - """.stripMargin - - // find jar file that contains the ml code - var externalJar = "" - val folder = new File("../flink-ml/target/") - val listOfFiles = folder.listFiles() - - for (i <- listOfFiles.indices) { - val filename: String = listOfFiles(i).getName - if (!filename.contains("test") && !filename.contains("original") && filename.contains( - ".jar")) { - externalJar = listOfFiles(i).getAbsolutePath - } - } - - assert(externalJar != "") - - val output: String = processInShell(input, Option(externalJar)) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("error")) - Assert.assertFalse(output.contains("Exception")) - - Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)")) - } - - - /** - * tests flink shell startup with remote cluster (starts cluster internally) - */ - @Test - def testRemoteCluster: Unit = { - - val input: String = - """ - |import org.apache.flink.api.common.functions.RichMapFunction - |import org.apache.flink.api.java.io.PrintingOutputFormat - |import org.apache.flink.api.common.accumulators.IntCounter - |import org.apache.flink.configuration.Configuration - | - |val els = env.fromElements("foobar","barfoo") - |val mapped = els.map{ - | new RichMapFunction[String, String]() { - | var intCounter: IntCounter = _ - | override def open(conf: Configuration): Unit = { - | intCounter = getRuntimeContext.getIntCounter("intCounter") - | } - | - | def map(element: String): String = { - | intCounter.add(1) - | element - | } - | } - |} - |mapped.output(new PrintingOutputFormat()) - |val executionResult = env.execute("Test Job") - |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter")) - | - |:q - """.stripMargin - - val in: BufferedReader = new BufferedReader( - new StringReader( - input + "\n")) - val out: StringWriter = new StringWriter - - val baos: ByteArrayOutputStream = new ByteArrayOutputStream - val oldOut: PrintStream = System.out - System.setOut(new PrintStream(baos)) - - val (c, args) = cluster match{ - case Some(cl) => - val arg = Array("remote", - cl.hostname, - Integer.toString(cl.getLeaderRPCPort)) - (cl, arg) - case None => - throw new AssertionError("Cluster creation failed.") - } - - //start scala shell with initialized - // buffered reader for testing - FlinkShell.bufferedReader = Some(in) - FlinkShell.main(args) - baos.flush() - - val output: String = baos.toString - System.setOut(oldOut) - - Assert.assertTrue(output.contains("IntCounter: 2")) - Assert.assertTrue(output.contains("foobar")) - Assert.assertTrue(output.contains("barfoo")) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("Error")) - Assert.assertFalse(output.contains("ERROR")) - Assert.assertFalse(output.contains("Exception")) - } -} - -object ScalaShellITCase { - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 - - @BeforeClass - def beforeAll(): Unit = { - val cl = TestBaseUtils.startCluster( - 1, - parallelism, - false, - false, - false) - - cluster = Some(cl) - } - - @AfterClass - def afterAll(): Unit = { - // The Scala interpreter somehow changes the class loader. Therfore, we have to reset it - Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader) - cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, TimeUnit.SECONDS))) - } - - /** - * Run the input using a Scala Shell and return the output of the shell. - * @param input commands to be processed in the shell - * @return output of shell - */ - def processInShell(input: String, externalJars: Option[String] = None): String = { - val in = new BufferedReader(new StringReader(input + "\n")) - val out = new StringWriter() - val baos = new ByteArrayOutputStream() - - val oldOut = System.out - System.setOut(new PrintStream(baos)) - - // new local cluster - val host = "localhost" - val port = cluster match { - case Some(c) => c.getLeaderRPCPort - case _ => throw new RuntimeException("Test cluster not initialized.") - } - - val repl = externalJars match { - case Some(ej) => new FlinkILoop( - host, port, - Option(Array(ej)), - in, new PrintWriter(out)) - - case None => new FlinkILoop( - host, port, - in, new PrintWriter(out)) - } - - repl.settings = new Settings() - - // enable this line to use scala in intellij - repl.settings.usejavacp.value = true - - externalJars match { - case Some(ej) => repl.settings.classpath.value = ej - case None => - } - - repl.process(repl.settings) - - repl.closeInterpreter() - - System.setOut(oldOut) - - baos.flush() - - val stdout = baos.toString - - out.toString + stdout - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala deleted file mode 100644 index 57bbd9b..0000000 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala - -import java.io._ - -import org.apache.flink.util.TestLogger -import org.junit.Test -import org.junit.Assert - -class ScalaShellLocalStartupITCase extends TestLogger { - - /** - * tests flink shell with local setup through startup script in bin folder - */ - @Test - def testLocalCluster: Unit = { - val input: String = - """ - |import org.apache.flink.api.common.functions.RichMapFunction - |import org.apache.flink.api.java.io.PrintingOutputFormat - |import org.apache.flink.api.common.accumulators.IntCounter - |import org.apache.flink.configuration.Configuration - | - |val els = env.fromElements("foobar","barfoo") - |val mapped = els.map{ - | new RichMapFunction[String, String]() { - | var intCounter: IntCounter = _ - | override def open(conf: Configuration): Unit = { - | intCounter = getRuntimeContext.getIntCounter("intCounter") - | } - | - | def map(element: String): String = { - | intCounter.add(1) - | element - | } - | } - |} - |mapped.output(new PrintingOutputFormat()) - |val executionResult = env.execute("Test Job") - |System.out.println("IntCounter: " + executionResult.getIntCounterResult("intCounter")) - | - |:q - """.stripMargin - val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) - val out: StringWriter = new StringWriter - val baos: ByteArrayOutputStream = new ByteArrayOutputStream - val oldOut: PrintStream = System.out - System.setOut(new PrintStream(baos)) - val args: Array[String] = Array("local") - - //start flink scala shell - FlinkShell.bufferedReader = Some(in); - FlinkShell.main(args) - - baos.flush() - val output: String = baos.toString - System.setOut(oldOut) - - Assert.assertTrue(output.contains("IntCounter: 2")) - Assert.assertTrue(output.contains("foobar")) - Assert.assertTrue(output.contains("barfoo")) - - Assert.assertFalse(output.contains("failed")) - Assert.assertFalse(output.contains("Error")) - Assert.assertFalse(output.contains("ERROR")) - Assert.assertFalse(output.contains("Exception")) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh deleted file mode 100644 index fd85897..0000000 --- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh +++ /dev/null @@ -1,86 +0,0 @@ -#!/bin/bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# from scala-lang 2.10.4 - -# restore stty settings (echo in particular) -function restoreSttySettings() { - if [[ -n $SCALA_RUNNER_DEBUG ]]; then - echo "restoring stty:" - echo "$saved_stty" - fi - stty $saved_stty - saved_stty="" -} - -function onExit() { - [[ "$saved_stty" != "" ]] && restoreSttySettings - exit $scala_exit_status -} - - -# to reenable echo if we are interrupted before completing. -trap onExit INT -# save terminal settings -saved_stty=$(stty -g 2>/dev/null) -# clear on error so we don't later try to restore them -if [[ ! $? ]]; then - saved_stty="" -fi - - -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` - -. "$bin"/config.sh - -FLINK_CLASSPATH=`constructFlinkClassPath` - -# https://issues.scala-lang.org/browse/SI-6502, cant load external jars interactively -# in scala shell since 2.10, has to be done at startup -# checks arguments for additional classpath and adds it to the "standard classpath" - -EXTERNAL_LIB_FOUND=false -for ((i=1;i<=$#;i++)) -do - if [[ ${!i} = "-a" || ${!i} = "--addclasspath" ]] - then - EXTERNAL_LIB_FOUND=true - - #adding to classpath - k=$((i+1)) - j=$((k+1)) - echo " " - echo "Additional classpath:${!k}" - echo " " - EXT_CLASSPATH="${!k}" - FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}" - set -- "${@:1:$((i-1))}" "${@:j}" - fi -done - -if ${EXTERNAL_LIB_FOUND} -then - java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" -else - java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ -fi - -#restore echo -onExit http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml deleted file mode 100644 index f13f422..0000000 --- a/flink-staging/flink-table/pom.xml +++ /dev/null @@ -1,258 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-table</artifactId> - <name>flink-table</name> - - <packaging>jar</packaging> - - <dependencies> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-batch</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> - <version>2.7.5</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - <compilerPlugins combine.children="append"> - <compilerPlugin> - <groupId>org.scalamacros</groupId> - <artifactId>paradise_${scala.version}</artifactId> - <version>${scala.macros.version}</version> - </compilerPlugin> - </compilerPlugins> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - - </plugins> - </build> - - <profiles> - <profile> - <id>scala-2.10</id> - <activation> - <property> - <!-- this is the default scala profile --> - <name>!scala-2.11</name> - </property> - </activation> - <dependencies> - <dependency> - <groupId>org.scalamacros</groupId> - <artifactId>quasiquotes_${scala.binary.version}</artifactId> - <version>${scala.macros.version}</version> - </dependency> - </dependencies> - </profile> - </profiles> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java deleted file mode 100644 index 97113bb..0000000 --- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <strong>Table API (Java)</strong><br> - * - * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to create a - * {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet} - * or {@link org.apache.flink.streaming.api.datastream.DataStream}. - * - * <p> - * This can be used to perform SQL-like queries on data. Please have - * a look at {@link org.apache.flink.api.table.Table} to see which operations are supported and - * how query strings are written. - * - * <p> - * Example: - * - * <pre>{@code - * ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - * - * DataSet<WC> input = env.fromElements( - * new WC("Hello", 1), - * new WC("Ciao", 1), - * new WC("Hello", 1)); - * - * Table table = TableUtil.from(input); - * - * Table filtered = table - * .groupBy("word") - * .select("word.count as count, word") - * .filter("count = 2"); - * - * DataSet<WC> result = TableUtil.toSet(filtered, WC.class); - * - * result.print(); - * env.execute(); - * }</pre> - * - * <p> - * As seen above, a {@link org.apache.flink.api.table.Table} can be converted back to the - * underlying API representation using {@link org.apache.flink.api.java.table.TableEnvironment#toDataSet(org.apache.flink.api.table.Table, java.lang.Class)} - * or {@link org.apache.flink.api.java.table.TableEnvironment#toDataStream(org.apache.flink.api.table.Table, java.lang.Class)}}. - */ -package org.apache.flink.api.java.table; http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java deleted file mode 100644 index d7fbc8e..0000000 --- a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <strong>Table API</strong><br> - * - * This package contains the generic part of the Table API. It can be used with Flink Streaming - * and Flink Batch. From Scala as well as from Java. - * - * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from - * a DataSet or DataStream. On this relational operations can be performed. A table can also - * be converted back to a DataSet or DataStream. - * - * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain - * the language specific part of the API. Refer to these packages for documentation on how - * the Table API can be used in Java and Scala. - */ -package org.apache.flink.api.table; http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java deleted file mode 100644 index c043508..0000000 --- a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.examples.java; - - -import org.apache.flink.api.table.Table; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.table.TableEnvironment; - -/** - * Very simple example that shows how the Java Table API can be used. - */ -public class JavaTableExample { - - public static class WC { - public String word; - public int count; - - // Public constructor to make it a Flink POJO - public WC() { - - } - - public WC(String word, int count) { - this.word = word; - this.count = count; - } - - @Override - public String toString() { - return "WC " + word + " " + count; - } - } - - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); - - DataSet<WC> input = env.fromElements( - new WC("Hello", 1), - new WC("Ciao", 1), - new WC("Hello", 1)); - - Table table = tableEnv.fromDataSet(input); - - Table filtered = table - .groupBy("word") - .select("word.count as count, word") - .filter("count = 2"); - - DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class); - - result.print(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala deleted file mode 100644 index 9dc9297..0000000 --- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.table - -import java.lang.reflect.Modifier - -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.api.java.operators.JoinOperator.EquiJoin -import org.apache.flink.api.java.operators.Keys.ExpressionKeys -import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} -import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields -import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.runtime._ -import org.apache.flink.api.table.expressions._ -import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.table.{ExpressionException, Row, Table} - -/** - * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and - * translating them back to Java [[org.apache.flink.api.java.DataSet]]s. - */ -class JavaBatchTranslator extends PlanTranslator { - - type Representation[A] = JavaDataSet[A] - - override def createTable[A]( - repr: Representation[A], - inputType: CompositeType[A], - expressions: Array[Expression], - resultFields: Seq[(String, TypeInformation[_])]): Table = { - - val rowDataSet = createSelect(expressions, repr, inputType) - - Table(Root(rowDataSet, resultFields)) - } - - override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[JavaDataSet[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataSet of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "A Table can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] - - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] - - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq - - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same " + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - val operator = new MapOperator(resultSet, outputType, function, opName) - - operator - } - - private def translateInternal(op: PlanNode): JavaDataSet[Row] = { - op match { - case Root(dataSet: JavaDataSet[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op + ". " + - "Did you try converting a Table based on a DataSet to a DataStream or vice-versa?") - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) - new RenameOperator(translatedInput, proxyType) - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedInput = input match { - case GroupBy(groupByInput, groupExpressions) => - val translatedGroupByInput = translateInternal(groupByInput) - val inType = translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]] - - val keyIndices = groupExpressions map { - case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) - case e => - throw new ExpressionException(s"Expression $e is not a valid key expression.") - } - - val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) - val grouping = new UnsortedGrouping(translatedGroupByInput, keys) - - new GroupReduceOperator( - grouping, - inType, - new NoExpressionAggregateFunction(), - "Nop Expression Aggregation") - - case _ => translateInternal(input) - } - - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val keyIndices = groupExpressions map { - case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) - case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") - } - - val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) - - val grouping = new UnsortedGrouping(translatedInput, keys) - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - grouping, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - case agg@Aggregate(input, aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - translatedInput, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter).name(predicate.toString) - - case uni@UnionAll(left, right) => - val translatedLeft = translateInternal(left) - val translatedRight = translateInternal(right) - translatedLeft.union(translatedRight).name("Union: " + uni) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: JavaDataSet[I], - inputType: CompositeType[I]): JavaDataSet[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - val operator = new MapOperator(input, resultType, function, opName) - - operator - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: JavaDataSet[L], - rightInput: JavaDataSet[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): JavaDataSet[Row] = { - - val resultType = new RowTypeInfo(fields) - - val (reducedPredicate, leftFields, rightFields) = - ExtractEquiJoinFields(leftType, rightType, predicate) - - if (leftFields.isEmpty || rightFields.isEmpty) { - throw new ExpressionException("Could not derive equi-join predicates " + - "for predicate " + predicate + ".") - } - - val leftKey = new ExpressionKeys[L](leftFields, leftType) - val rightKey = new ExpressionKeys[R](rightFields, rightType) - - val joiner = new ExpressionJoinFunction[L, R, Row]( - reducedPredicate, - leftType, - rightType, - resultType, - fields) - - new EquiJoin[L, R, Row]( - leftInput, - rightInput, - leftKey, - rightKey, - joiner, - resultType, - joinHint, - predicate.toString) - } -}