http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
 
b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
deleted file mode 100644
index 83fb342..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/JarHelper.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java;
-
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.InputStream;
-
-import java.util.jar.JarOutputStream;
-import java.util.jar.JarEntry;
-import java.util.jar.JarInputStream;
-
-/**
- * Provides utility services for jarring and unjarring files and directories.
- * Note that a given instance of JarHelper is not threadsafe with respect to
- * multiple jar operations.
- *
- * Copied from 
http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans/xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
- *
- * @author Patrick Calahan <a href="mailto:p...@bea.com";>p...@bea.com</a>
- */
-public class JarHelper
-{
-       // 
========================================================================
-       // Constants
-
-       private static final int BUFFER_SIZE = 2156;
-
-       // 
========================================================================
-       // Variables
-
-       private byte[] mBuffer = new byte[BUFFER_SIZE];
-       private int mByteCount = 0;
-       private boolean mVerbose = false;
-       private String mDestJarName = "";
-
-       // 
========================================================================
-       // Constructor
-
-       /**
-        * Instantiates a new JarHelper.
-        */
-       public JarHelper() {}
-
-       // 
========================================================================
-       // Public methods
-
-       /**
-        * Jars a given directory or single file into a JarOutputStream.
-        */
-       public void jarDir(File dirOrFile2Jar, File destJar)
-               throws IOException {
-
-       if (dirOrFile2Jar == null || destJar == null)
-       {
-               throw new IllegalArgumentException();
-       }
-
-       mDestJarName = destJar.getCanonicalPath();
-       FileOutputStream fout = new FileOutputStream(destJar);
-       JarOutputStream jout = new JarOutputStream(fout);
-       //jout.setLevel(0);
-       try {
-               jarDir(dirOrFile2Jar, jout, null);
-       } catch(IOException ioe) {
-               throw ioe;
-       } finally {
-               jout.close();
-               fout.close();
-       }
-       }
-
-       /**
-        * Unjars a given jar file into a given directory.
-        */
-       public void unjarDir(File jarFile, File destDir) throws IOException {
-       BufferedOutputStream dest = null;
-       FileInputStream fis = new FileInputStream(jarFile);
-       unjar(fis, destDir);
-       }
-
-       /**
-        * Given an InputStream on a jar file, unjars the contents into the 
given
-        * directory.
-        */
-       public void unjar(InputStream in, File destDir) throws IOException {
-       BufferedOutputStream dest = null;
-       JarInputStream jis = new JarInputStream(in);
-       JarEntry entry;
-       while ((entry = jis.getNextJarEntry()) != null) {
-               if (entry.isDirectory()) {
-               File dir = new File(destDir,entry.getName());
-               dir.mkdir();
-               if (entry.getTime() != -1) 
{dir.setLastModified(entry.getTime());}
-               continue;
-               }
-               int count;
-               byte[] data = new byte[ BUFFER_SIZE ];
-               File destFile = new File(destDir, entry.getName());
-               if (mVerbose) {
-                       System.out.println("unjarring " + destFile +
-                                       " from " + entry.getName());
-               }
-               FileOutputStream fos = new FileOutputStream(destFile);
-               dest = new BufferedOutputStream(fos, BUFFER_SIZE);
-               try {
-                       while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
-                               dest.write(data, 0, count);
-                       }
-                       dest.flush();
-               } finally {
-                       dest.close();
-               }
-               if (entry.getTime() != -1) 
{destFile.setLastModified(entry.getTime());}
-       }
-       jis.close();
-       }
-
-       public void setVerbose(boolean b) {
-       mVerbose = b;
-       }
-
-       // 
========================================================================
-       // Private methods
-
-       private static final char SEP = '/';
-       /**
-        * Recursively jars up the given path under the given directory.
-        */
-       private void jarDir(File dirOrFile2jar, JarOutputStream jos, String 
path)
-               throws IOException {
-       if (mVerbose) { System.out.println("checking " + dirOrFile2jar);}
-       if (dirOrFile2jar.isDirectory()) {
-               String[] dirList = dirOrFile2jar.list();
-               String subPath = (path == null) ? "" : 
(path+dirOrFile2jar.getName()+SEP);
-               if (path != null) {
-               JarEntry je = new JarEntry(subPath);
-               je.setTime(dirOrFile2jar.lastModified());
-               jos.putNextEntry(je);
-               jos.flush();
-               jos.closeEntry();
-               }
-               for (int i = 0; i < dirList.length; i++) {
-               File f = new File(dirOrFile2jar, dirList[i]);
-               jarDir(f,jos,subPath);
-               }
-       } else {
-               if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName))
-               {
-               if (mVerbose) {System.out.println("skipping " + 
dirOrFile2jar.getPath());}
-               return;
-               }
-
-               if (mVerbose) {
-                       System.out.println("adding " + dirOrFile2jar.getPath());
-               }
-               FileInputStream fis = new FileInputStream(dirOrFile2jar);
-               try {
-               JarEntry entry = new JarEntry(path+dirOrFile2jar.getName());
-               entry.setTime(dirOrFile2jar.lastModified());
-               jos.putNextEntry(entry);
-               while ((mByteCount = fis.read(mBuffer)) != -1) {
-                       jos.write(mBuffer, 0, mByteCount);
-                       if (mVerbose) { System.out.println("wrote " + 
mByteCount + " bytes");}
-               }
-               jos.flush();
-               jos.closeEntry();
-               } catch (IOException ioe) {
-               throw ioe;
-               } finally {
-               fis.close();
-               }
-       }
-       }
-
-       // for debugging
-       public static void main(String[] args)
-               throws IOException
-       {
-       if (args.length < 2)
-       {
-               System.err.println("Usage: JarHelper jarname.jar directory");
-               return;
-       }
-
-       JarHelper jarHelper = new JarHelper();
-       jarHelper.mVerbose = true;
-
-       File destJar = new File(args[0]);
-       File dirOrFile2Jar = new File(args[1]);
-
-       jarHelper.jarDir(dirOrFile2Jar, destJar);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
 
b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
deleted file mode 100644
index a336957..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java
+++ /dev/null
@@ -1,107 +0,0 @@
-
-package org.apache.flink.api.java;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-
-import org.apache.flink.api.scala.FlinkILoop;
-import org.apache.flink.configuration.Configuration;
-
-import java.io.File;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that 
has a reference
- * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called 
this will
- * use the reference of the ILoop to write the compiled classes of the current 
session to
- * a Jar file and submit these with the program.
- */
-public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
-
-       // reference to Scala Shell, for access to virtual directory
-       private FlinkILoop flinkILoop;
-
-       /**
-        * Creates new ScalaShellRemoteEnvironment that has a reference to the 
FlinkILoop
-        *
-        * @param host     The host name or address of the master (JobManager), 
where the program should be executed.
-        * @param port     The port of the master (JobManager), where the 
program should be executed.
-        * @param flinkILoop The flink Iloop instance from which the 
ScalaShellRemoteEnvironment is called.
-        */
-       public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop 
flinkILoop, String... jarFiles) {
-               super(host, port, null, jarFiles, null);
-               this.flinkILoop = flinkILoop;
-       }
-
-       /**
-        * compiles jars from files in the shell virtual directory on the fly, 
sends and executes it in the remote environment
-        *
-        * @param jobName name of the job as string
-        * @return Result of the computation
-        * @throws Exception
-        */
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               Plan p = createProgramPlan(jobName);
-
-               URL jarUrl = 
flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
-
-               // get "external jars, and add the shell command jar, pass to 
executor
-               List<URL> alljars = new ArrayList<>();
-               // get external (library) jars
-               String[] extJars = this.flinkILoop.getExternalJars();
-
-               for (String extJar : extJars) {
-                       URL extJarUrl = new 
File(extJar).getAbsoluteFile().toURI().toURL();
-                       alljars.add(extJarUrl);
-               }
-
-               // add shell commands
-               alljars.add(jarUrl);
-               PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, 
port, new Configuration(),
-                               alljars.toArray(new URL[alljars.size()]), null);
-
-               
executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled());
-               return executor.executePlan(p);
-       }
-
-       public static void disableAllContextAndOtherEnvironments() {
-               
-               // we create a context environment that prevents the 
instantiation of further
-               // context environments. at the same time, setting the context 
environment prevents manual
-               // creation of local and remote environments
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               throw new 
UnsupportedOperationException("Execution Environment is already defined" +
-                                               " for this shell.");
-                       }
-               };
-               initializeContextEnvironment(factory);
-       }
-       
-       public static void resetContextEnvironments() {
-               ExecutionEnvironment.resetContextEnvironment();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
 
b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
deleted file mode 100644
index 7751751..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.BufferedReader
-
-import _root_.scala.tools.nsc.interpreter._
-
-class ILoopCompat(
-    in0: Option[BufferedReader],
-    out0: JPrintWriter)
-    extends ILoop(in0, out0) {
-
-  override def prompt = "Scala-Flink> "
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
 
b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
deleted file mode 100644
index 1c395bb..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.BufferedReader
-
-import _root_.scala.tools.nsc.interpreter._
-import _root_.scala.io.AnsiColor.{MAGENTA, RESET}
-
-class ILoopCompat(
-    in0: Option[BufferedReader],
-    out0: JPrintWriter)
-    extends ILoop(in0, out0) {
-
-  override def prompt = {
-    val promptStr = "Scala-Flink> "
-    s"$MAGENTA$promptStr$RESET"
-  }
-
-  protected def addThunk(f: => Unit): Unit = f
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
 
b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
deleted file mode 100644
index bcc9ef3..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{BufferedReader, File, FileOutputStream}
-
-import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
-import org.apache.flink.util.AbstractID
-
-import scala.tools.nsc.interpreter._
-
-
-class FlinkILoop(
-    val host: String,
-    val port: Int,
-    val externalJars: Option[Array[String]],
-    in0: Option[BufferedReader],
-    out0: JPrintWriter)
-  extends ILoopCompat(in0, out0) {
-
-  def this(host: String,
-           port: Int,
-           externalJars: Option[Array[String]],
-           in0: BufferedReader, 
-           out: JPrintWriter){
-    this(host: String, port: Int, externalJars, Some(in0), out)
-  }
-
-  def this(host: String, port: Int, externalJars: Option[Array[String]]){
-    this(host: String, port: Int, externalJars, None, new 
JPrintWriter(Console.out, true))
-  }
-  
-  def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
-    this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
-  }
-
-  // remote environment
-  private val remoteEnv: ScalaShellRemoteEnvironment = {
-    // allow creation of environments
-    ScalaShellRemoteEnvironment.resetContextEnvironments()
-    
-    // create our environment that submits against the cluster (local or 
remote)
-    val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
-    
-    // prevent further instantiation of environments
-    ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
-    
-    remoteEnv
-  }
-
-  // local environment
-  val scalaEnv: ExecutionEnvironment = {
-    val scalaEnv = new ExecutionEnvironment(remoteEnv)
-    scalaEnv
-  }
-
-  /**
-   * creates a temporary directory to store compiled console files
-   */
-  private val tmpDirBase: File = {
-    // get unique temporary folder:
-    val abstractID: String = new AbstractID().toString
-    val tmpDir: File = new File(
-      System.getProperty("java.io.tmpdir"),
-      "scala_shell_tmp-" + abstractID)
-    if (!tmpDir.exists) {
-      tmpDir.mkdir
-    }
-    tmpDir
-  }
-
-  // scala_shell commands
-  private val tmpDirShell: File = {
-    new File(tmpDirBase, "scala_shell_commands")
-  }
-
-  // scala shell jar file name
-  private val tmpJarShell: File = {
-    new File(tmpDirBase, "scala_shell_commands.jar")
-  }
-
-  private val packageImports = Seq[String](
-    "org.apache.flink.core.fs._",
-    "org.apache.flink.core.fs.local._",
-    "org.apache.flink.api.common.io._",
-    "org.apache.flink.api.common.aggregators._",
-    "org.apache.flink.api.common.accumulators._",
-    "org.apache.flink.api.common.distributions._",
-    "org.apache.flink.api.common.operators._",
-    "org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint",
-    "org.apache.flink.api.common.functions._",
-    "org.apache.flink.api.java.io._",
-    "org.apache.flink.api.java.aggregation._",
-    "org.apache.flink.api.java.functions._",
-    "org.apache.flink.api.java.operators._",
-    "org.apache.flink.api.java.sampling._",
-    "org.apache.flink.api.scala._",
-    "org.apache.flink.api.scala.utils._"
-  )
-
-  override def createInterpreter(): Unit = {
-    super.createInterpreter()
-
-    addThunk {
-      intp.beQuietDuring {
-        // import dependencies
-        intp.interpret("import " + packageImports.mkString(", "))
-
-        // set execution environment
-        intp.bind("env", this.scalaEnv)
-      }
-    }
-  }
-
-  /**
-   * Packages the compiled classes of the current shell session into a Jar 
file for execution
-   * on a Flink cluster.
-   *
-   * @return The path of the created Jar file
-   */
-  def writeFilesToDisk(): File = {
-    val vd = intp.virtualDirectory
-
-    val vdIt = vd.iterator
-
-    for (fi <- vdIt) {
-      if (fi.isDirectory) {
-
-        val fiIt = fi.iterator
-
-        for (f <- fiIt) {
-
-          // directory for compiled line
-          val lineDir = new File(tmpDirShell.getAbsolutePath, fi.name)
-          lineDir.mkdirs()
-
-          // compiled classes for commands from shell
-          val writeFile = new File(lineDir.getAbsolutePath, f.name)
-          val outputStream = new FileOutputStream(writeFile)
-          val inputStream = f.input
-
-          // copy file contents
-          org.apache.commons.io.IOUtils.copy(inputStream, outputStream)
-
-          inputStream.close()
-          outputStream.close()
-        }
-      }
-    }
-
-    val compiledClasses = new File(tmpDirShell.getAbsolutePath)
-
-    val jarFilePath = new File(tmpJarShell.getAbsolutePath)
-
-    val jh: JarHelper = new JarHelper
-    jh.jarDir(compiledClasses, jarFilePath)
-
-    jarFilePath
-  }
-
-  /**
-   * custom welcome message
-   */
-  override def printWelcome() {
-    echo(
-      // scalastyle:off
-      """
-                         \u2592\u2593\u2588\u2588\u2593\u2588\u2588\u2592
-                     
\u2593\u2588\u2588\u2588\u2588\u2592\u2592\u2588\u2593\u2592\u2593\u2588\u2588\u2588\u2593\u2592
-                  \u2593\u2588\u2588\u2588\u2593\u2591\u2591        
\u2592\u2592\u2592\u2593\u2588\u2588\u2592  \u2592
-                \u2591\u2588\u2588\u2592   
\u2592\u2592\u2593\u2593\u2588\u2593\u2593\u2592\u2591      
\u2592\u2588\u2588\u2588\u2588
-                \u2588\u2588\u2592         
\u2591\u2592\u2593\u2588\u2588\u2588\u2592    \u2592\u2588\u2592\u2588\u2592
-                  \u2591\u2593\u2588            \u2588\u2588\u2588   
\u2593\u2591\u2592\u2588\u2588
-                    \u2593\u2588       
\u2592\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591\u2592\u2591\u2593\u2593\u2588
-                  \u2588\u2591 \u2588   \u2592\u2592\u2591       
\u2588\u2588\u2588\u2593\u2593\u2588 \u2592\u2588\u2592\u2592\u2592
-                  \u2588\u2588\u2588\u2588\u2591   \u2592\u2593\u2588\u2593    
  \u2588\u2588\u2592\u2592\u2592 \u2593\u2588\u2588\u2588\u2592
-               \u2591\u2592\u2588\u2593\u2593\u2588\u2588       
\u2593\u2588\u2592    \u2593\u2588\u2592\u2593\u2588\u2588\u2593 
\u2591\u2588\u2591
-         \u2593\u2591\u2592\u2593\u2588\u2588\u2588\u2588\u2592 \u2588\u2588   
      \u2592\u2588    
\u2588\u2593\u2591\u2592\u2588\u2592\u2591\u2592\u2588\u2592
-        \u2588\u2588\u2588\u2593\u2591\u2588\u2588\u2593  \u2593\u2588         
  \u2588   \u2588\u2593 \u2592\u2593\u2588\u2593\u2593\u2588\u2592
-      \u2591\u2588\u2588\u2593  \u2591\u2588\u2591            \u2588  
\u2588\u2592 \u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2592 
\u2588\u2588\u2593\u2591\u2592
-     \u2588\u2588\u2588\u2591 \u2591 \u2588\u2591          \u2593 \u2591\u2588 
\u2588\u2588\u2588\u2588\u2588\u2592\u2591\u2591    \u2591\u2588\u2591\u2593  
\u2593\u2591
-    \u2588\u2588\u2593\u2588 \u2592\u2592\u2593\u2592          
\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2591       
\u2592\u2588\u2592 \u2592\u2593 \u2593\u2588\u2588\u2593
- \u2592\u2588\u2588\u2593 \u2593\u2588 \u2588\u2593\u2588       
\u2591\u2592\u2588\u2588\u2588\u2588\u2588\u2593\u2593\u2592\u2591         
\u2588\u2588\u2592\u2592  \u2588 \u2592  \u2593\u2588\u2592
- \u2593\u2588\u2593  \u2593\u2588 \u2588\u2588\u2593 
\u2591\u2593\u2593\u2593\u2593\u2593\u2593\u2593\u2592              
\u2592\u2588\u2588\u2593           \u2591\u2588\u2592
- \u2593\u2588    \u2588 \u2593\u2588\u2588\u2588\u2593\u2592\u2591             
 \u2591\u2593\u2593\u2593\u2588\u2588\u2588\u2593          \u2591\u2592\u2591 
\u2593\u2588
- \u2588\u2588\u2593    \u2588\u2588\u2592    
\u2591\u2592\u2593\u2593\u2588\u2588\u2588\u2593\u2593\u2593\u2593\u2593\u2588\u2588\u2588\u2588\u2588\u2588\u2593\u2592
            \u2593\u2588\u2588\u2588  \u2588
-\u2593\u2588\u2588\u2588\u2592 \u2588\u2588\u2588   
\u2591\u2593\u2593\u2592\u2591\u2591   
\u2591\u2593\u2588\u2588\u2588\u2588\u2593\u2591                  
\u2591\u2592\u2593\u2592  \u2588\u2593
-\u2588\u2593\u2592\u2592\u2593\u2593\u2588\u2588  
\u2591\u2592\u2592\u2591\u2591\u2591\u2592\u2592\u2592\u2592\u2593\u2588\u2588\u2593\u2591
                            \u2588\u2593
-\u2588\u2588 \u2593\u2591\u2592\u2588   
\u2593\u2593\u2593\u2593\u2592\u2591\u2591  \u2592\u2588\u2593       
\u2592\u2593\u2593\u2588\u2588\u2593    \u2593\u2592          \u2592\u2592\u2593
-\u2593\u2588\u2593 \u2593\u2592\u2588  \u2588\u2593\u2591  
\u2591\u2592\u2593\u2593\u2588\u2588\u2592            \u2591\u2593\u2588\u2592  
 \u2592\u2592\u2592\u2591\u2592\u2592\u2593\u2588\u2588\u2588\u2588\u2588\u2592
- \u2588\u2588\u2591 \u2593\u2588\u2592\u2588\u2592  \u2592\u2593\u2593\u2592  
\u2593\u2588                \u2588\u2591      \u2591\u2591\u2591\u2591   
\u2591\u2588\u2592
- \u2593\u2588   \u2592\u2588\u2593   \u2591     \u2588\u2591                
\u2592\u2588              \u2588\u2593
-  \u2588\u2593   \u2588\u2588         \u2588\u2591                 
\u2593\u2593        \u2592\u2588\u2593\u2593\u2593\u2592\u2588\u2591
-   \u2588\u2593 \u2591\u2593\u2588\u2588\u2591       \u2593\u2592              
    \u2593\u2588\u2593\u2592\u2591\u2591\u2591\u2592\u2593\u2588\u2591    
\u2592\u2588
-    \u2588\u2588   \u2593\u2588\u2593\u2591      \u2592                    
\u2591\u2592\u2588\u2592\u2588\u2588\u2592      \u2593\u2593
-     \u2593\u2588\u2592   \u2592\u2588\u2593\u2592\u2591                       
  \u2592\u2592 
\u2588\u2592\u2588\u2593\u2592\u2592\u2591\u2591\u2592\u2588\u2588
-      \u2591\u2588\u2588\u2592    \u2592\u2593\u2593\u2592                     
\u2593\u2588\u2588\u2593\u2592\u2588\u2592 
\u2591\u2593\u2593\u2593\u2593\u2592\u2588\u2593
-        \u2591\u2593\u2588\u2588\u2592                          \u2593\u2591  
\u2592\u2588\u2593\u2588  \u2591\u2591\u2592\u2592\u2592
-            
\u2592\u2593\u2593\u2593\u2593\u2593\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2592\u2591\u2591\u2593\u2593
  \u2593\u2591\u2592\u2588\u2591
-
-              F L I N K - S C A L A - S H E L L
-
-NOTE: Use the prebound Execution Environment "env" to read data and execute 
your program:
-  * env.readTextFile("/path/to/data")
-  * env.execute("Program name")
-
-HINT: You can use print() on a DataSet to print the contents to this shell.
-      """
-    // scalastyle:on
-    )
-
-  }
-
-  def getExternalJars(): Array[String] = 
externalJars.getOrElse(Array.empty[String])
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
 
b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
deleted file mode 100644
index eb7f816..0000000
--- 
a/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io.{StringWriter, BufferedReader}
-
-import org.apache.flink.api.common.ExecutionMode
-
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-
-import scala.tools.nsc.Settings
-
-import scala.tools.nsc.interpreter._
-
-
-object FlinkShell {
-
-  object ExecutionMode extends Enumeration {
-    val UNDEFINED, LOCAL, REMOTE = Value
-  }
-
-  var bufferedReader: Option[BufferedReader] = None
-
-  def main(args: Array[String]) {
-
-    // scopt, command line arguments
-    case class Config(
-        port: Int = -1,
-        host: String = "none",
-        externalJars: Option[Array[String]] = None,
-        flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED)
-
-    val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
-      head ("Flink Scala Shell")
-
-      cmd("local") action {
-        (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = 
ExecutionMode.LOCAL)
-      } text("starts Flink scala shell with a local Flink cluster\n") children(
-        opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
-          case (x, c) =>
-            val xArray = x.split(":")
-            c.copy(externalJars = Option(xArray))
-          } text("specifies additional jars to be used in Flink\n")
-        )
-
-      cmd("remote") action { (_, c) =>
-        c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
-      } text("starts Flink scala shell connecting to a remote cluster\n") 
children(
-        arg[String]("<host>") action { (h, c) =>
-          c.copy(host = h) }
-          text("remote host name as string"),
-        arg[Int]("<port>") action { (p, c) =>
-          c.copy(port = p) }
-          text("remote port as integer\n"),
-        opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") 
action {
-          case (x, c) =>
-            val xArray = x.split(":")
-            c.copy(externalJars = Option(xArray))
-          } text("specifies additional jars to be used in Flink")
-      )
-      help("help") abbr("h") text("prints this usage text\n")
-    }
-
-    // parse arguments
-    parser.parse (args, Config()) match {
-      case Some(config) =>
-        startShell(config.host,
-          config.port,
-          config.flinkShellExecutionMode,
-          config.externalJars)
-
-      case _ => System.out.println("Could not parse program arguments")
-    }
-  }
-
-
-  def startShell(
-      userHost: String,
-      userPort: Int,
-      executionMode: ExecutionMode.Value,
-      externalJars: Option[Array[String]] = None): Unit ={
-    
-    System.out.println("Starting Flink Shell:")
-
-    // either port or userhost not specified by user, create new minicluster
-    val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
-      executionMode match {
-        case ExecutionMode.LOCAL =>
-          val config = new Configuration()
-          config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
-          val miniCluster = new LocalFlinkMiniCluster(config, false)
-          miniCluster.start()
-          val port = miniCluster.getLeaderRPCPort
-          System.out.println(s"\nStarting local Flink cluster (host: 
localhost, port: $port).\n")
-          ("localhost", port, Some(miniCluster))
-
-        case ExecutionMode.REMOTE =>
-          if (userHost == "none" || userPort == -1) {
-            System.out.println("Error: <host> or <port> not specified!")
-            return
-          } else {
-            System.out.println(
-              s"\nConnecting to Flink cluster (host: $userHost, port: 
$userPort).\n")
-            (userHost, userPort, None)
-          }
-
-        case ExecutionMode.UNDEFINED =>
-          System.out.println("Error: please specify execution mode:")
-          System.out.println("[local | remote <host> <port>]")
-          return
-      }
-
-    var repl: Option[FlinkILoop] = None
-
-    try {
-      // custom shell
-      repl = Some(
-        bufferedReader match {
-
-          case Some(br) =>
-            val out = new StringWriter()
-            new FlinkILoop(host, port, externalJars, bufferedReader, new 
JPrintWriter(out))
-
-          case None =>
-            new FlinkILoop(host, port, externalJars)
-        })
-
-      val settings = new Settings()
-
-      settings.usejavacp.value = true
-      settings.Yreplsync.value = true
-
-      // start scala interpreter shell
-      repl.foreach(_.process(settings))
-    } finally {
-      repl.foreach(_.closeInterpreter())
-      cluster.foreach(_.stop())
-    }
-
-    System.out.println(" good bye ..")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties 
b/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
deleted file mode 100644
index 9912b19..0000000
--- a/flink-staging/flink-scala-shell/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-
-# Convenience file for local debugging of the JobManager/TaskManager.
-log4j.rootLogger=OFF, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml 
b/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-staging/flink-scala-shell/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
deleted file mode 100644
index 00e36ab..0000000
--- 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io._
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
-import org.apache.flink.util.TestLogger
-import org.junit.{AfterClass, BeforeClass, Test, Assert}
-
-import scala.concurrent.duration.FiniteDuration
-import scala.tools.nsc.Settings
-
-class ScalaShellITCase extends TestLogger {
-
-  import ScalaShellITCase._
-
-  /** Prevent re-creation of environment */
-  @Test
-  def testPreventRecreation(): Unit = {
-
-    val input: String =
-      """
-        val env = ExecutionEnvironment.getExecutionEnvironment
-      """.stripMargin
-
-    val output: String = processInShell(input)
-
-    Assert.assertTrue(output.contains(
-      "UnsupportedOperationException: Execution Environment is already " +
-      "defined for this shell"))
-  }
-
-  /** Iteration test with iterative Pi example */
-  @Test
-  def testIterativePI(): Unit = {
-
-    val input: String =
-      """
-        val initial = env.fromElements(0)
-        val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
-          val result = iterationInput.map { i =>
-            val x = Math.random()
-            val y = Math.random()
-            i + (if (x * x + y * y < 1) 1 else 0)
-          }
-          result
-        }
-        val result = count map { c => c / 10000.0 * 4 }
-        result.collect()
-      """.stripMargin
-
-    val output: String = processInShell(input)
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("error"))
-    Assert.assertFalse(output.contains("Exception"))
-  }
-
-  /** WordCount in Shell */
-  @Test
-  def testWordCount(): Unit = {
-    val input =
-      """
-        val text = env.fromElements("To be, or not to be,--that is the 
question:--",
-        "Whether 'tis nobler in the mind to suffer",
-        "The slings and arrows of outrageous fortune",
-        "Or to take arms against a sea of troubles,")
-        val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) 
}.groupBy(0).sum(1)
-        val result = counts.print()
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("error"))
-    Assert.assertFalse(output.contains("Exception"))
-
-    // some of the words that should be included
-    Assert.assertTrue(output.contains("(a,1)"))
-    Assert.assertTrue(output.contains("(whether,1)"))
-    Assert.assertTrue(output.contains("(to,4)"))
-    Assert.assertTrue(output.contains("(arrows,1)"))
-  }
-
-  /** Sum 1..10, should be 55 */
-  @Test
-  def testSum: Unit = {
-    val input =
-      """
-        val input: DataSet[Int] = env.fromElements(0,1,2,3,4,5,6,7,8,9,10)
-        val reduced = input.reduce(_+_)
-        reduced.print
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("error"))
-    Assert.assertFalse(output.contains("Exception"))
-
-    Assert.assertTrue(output.contains("55"))
-  }
-
-  /** WordCount in Shell with custom case class */
-  @Test
-  def testWordCountWithCustomCaseClass: Unit = {
-    val input =
-      """
-      case class WC(word: String, count: Int)
-      val wordCounts = env.fromElements(
-        new WC("hello", 1),
-        new WC("world", 2),
-        new WC("world", 8))
-      val reduced = wordCounts.groupBy(0).sum(1)
-      reduced.print()
-      """.stripMargin
-
-    val output = processInShell(input)
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("error"))
-    Assert.assertFalse(output.contains("Exception"))
-
-    Assert.assertTrue(output.contains("WC(hello,1)"))
-    Assert.assertTrue(output.contains("WC(world,10)"))
-  }
-
-  /** Submit external library */
-  @Test
-  def testSubmissionOfExternalLibrary: Unit = {
-    val input =
-      """
-        import org.apache.flink.ml.math._
-        val denseVectors = env.fromElements(DenseVector(1.0, 2.0, 3.0))
-        denseVectors.print()
-      """.stripMargin
-
-    // find jar file that contains the ml code
-    var externalJar = ""
-    val folder = new File("../flink-ml/target/")
-    val listOfFiles = folder.listFiles()
-
-    for (i <- listOfFiles.indices) {
-      val filename: String = listOfFiles(i).getName
-      if (!filename.contains("test") && !filename.contains("original") && 
filename.contains(
-        ".jar")) {
-        externalJar = listOfFiles(i).getAbsolutePath
-      }
-    }
-
-    assert(externalJar != "")
-
-    val output: String = processInShell(input, Option(externalJar))
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("error"))
-    Assert.assertFalse(output.contains("Exception"))
-
-    Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
-  }
-
-
-  /**
-   * tests flink shell startup with remote cluster (starts cluster internally)
-   */
-  @Test
-  def testRemoteCluster: Unit = {
-
-    val input: String =
-      """
-        |import org.apache.flink.api.common.functions.RichMapFunction
-        |import org.apache.flink.api.java.io.PrintingOutputFormat
-        |import org.apache.flink.api.common.accumulators.IntCounter
-        |import org.apache.flink.configuration.Configuration
-        |
-        |val els = env.fromElements("foobar","barfoo")
-        |val mapped = els.map{
-        | new RichMapFunction[String, String]() {
-        |   var intCounter: IntCounter = _
-        |   override def open(conf: Configuration): Unit = {
-        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
-        |   }
-        |
-        |   def map(element: String): String = {
-        |     intCounter.add(1)
-        |     element
-        |   }
-        | }
-        |}
-        |mapped.output(new PrintingOutputFormat())
-        |val executionResult = env.execute("Test Job")
-        |System.out.println("IntCounter: " + 
executionResult.getIntCounterResult("intCounter"))
-        |
-        |:q
-      """.stripMargin
-
-    val in: BufferedReader = new BufferedReader(
-      new StringReader(
-        input + "\n"))
-    val out: StringWriter = new StringWriter
-
-    val baos: ByteArrayOutputStream = new ByteArrayOutputStream
-    val oldOut: PrintStream = System.out
-    System.setOut(new PrintStream(baos))
-
-    val (c, args) = cluster match{
-      case Some(cl) =>
-        val arg = Array("remote",
-          cl.hostname,
-          Integer.toString(cl.getLeaderRPCPort))
-        (cl, arg)
-      case None =>
-        throw new AssertionError("Cluster creation failed.")
-    }
-
-    //start scala shell with initialized
-    // buffered reader for testing
-    FlinkShell.bufferedReader = Some(in)
-    FlinkShell.main(args)
-    baos.flush()
-
-    val output: String = baos.toString
-    System.setOut(oldOut)
-
-    Assert.assertTrue(output.contains("IntCounter: 2"))
-    Assert.assertTrue(output.contains("foobar"))
-    Assert.assertTrue(output.contains("barfoo"))
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("Error"))
-    Assert.assertFalse(output.contains("ERROR"))
-    Assert.assertFalse(output.contains("Exception"))
-  }
-}
-
-object ScalaShellITCase {
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
-
-  @BeforeClass
-  def beforeAll(): Unit = {
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      false,
-      false,
-      false)
-
-    cluster = Some(cl)
-  }
-
-  @AfterClass
-  def afterAll(): Unit = {
-    // The Scala interpreter somehow changes the class loader. Therfore, we 
have to reset it
-    
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
-    cluster.foreach(c => TestBaseUtils.stopCluster(c, new FiniteDuration(1000, 
TimeUnit.SECONDS)))
-  }
-
-  /**
-   * Run the input using a Scala Shell and return the output of the shell.
-   * @param input commands to be processed in the shell
-   * @return output of shell
-   */
-  def processInShell(input: String, externalJars: Option[String] = None): 
String = {
-    val in = new BufferedReader(new StringReader(input + "\n"))
-    val out = new StringWriter()
-    val baos = new ByteArrayOutputStream()
-
-    val oldOut = System.out
-    System.setOut(new PrintStream(baos))
-
-    // new local cluster
-    val host = "localhost"
-    val port = cluster match {
-      case Some(c) => c.getLeaderRPCPort
-      case _ => throw new RuntimeException("Test cluster not initialized.")
-    }
-
-    val repl = externalJars match {
-      case Some(ej) => new FlinkILoop(
-        host, port,
-        Option(Array(ej)),
-        in, new PrintWriter(out))
-
-      case None => new FlinkILoop(
-        host, port,
-        in, new PrintWriter(out))
-    }
-
-    repl.settings = new Settings()
-
-    // enable this line to use scala in intellij
-    repl.settings.usejavacp.value = true
-
-    externalJars match {
-      case Some(ej) => repl.settings.classpath.value = ej
-      case None =>
-    }
-
-    repl.process(repl.settings)
-
-    repl.closeInterpreter()
-
-    System.setOut(oldOut)
-
-    baos.flush()
-
-    val stdout = baos.toString
-
-    out.toString + stdout
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
 
b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
deleted file mode 100644
index 57bbd9b..0000000
--- 
a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import java.io._
-
-import org.apache.flink.util.TestLogger
-import org.junit.Test
-import org.junit.Assert
-
-class ScalaShellLocalStartupITCase extends TestLogger {
-
-  /**
-   * tests flink shell with local setup through startup script in bin folder
-   */
-  @Test
-  def testLocalCluster: Unit = {
-    val input: String =
-      """
-        |import org.apache.flink.api.common.functions.RichMapFunction
-        |import org.apache.flink.api.java.io.PrintingOutputFormat
-        |import org.apache.flink.api.common.accumulators.IntCounter
-        |import org.apache.flink.configuration.Configuration
-        |
-        |val els = env.fromElements("foobar","barfoo")
-        |val mapped = els.map{
-        | new RichMapFunction[String, String]() {
-        |   var intCounter: IntCounter = _
-        |   override def open(conf: Configuration): Unit = {
-        |     intCounter = getRuntimeContext.getIntCounter("intCounter")
-        |   }
-        |
-        |   def map(element: String): String = {
-        |     intCounter.add(1)
-        |     element
-        |   }
-        | }
-        |}
-        |mapped.output(new PrintingOutputFormat())
-        |val executionResult = env.execute("Test Job")
-        |System.out.println("IntCounter: " + 
executionResult.getIntCounterResult("intCounter"))
-        |
-        |:q
-      """.stripMargin
-      val in: BufferedReader = new BufferedReader(new StringReader(input + 
"\n"))
-      val out: StringWriter = new StringWriter
-      val baos: ByteArrayOutputStream = new ByteArrayOutputStream
-      val oldOut: PrintStream = System.out
-      System.setOut(new PrintStream(baos))
-      val args: Array[String] = Array("local")
-
-    //start flink scala shell
-    FlinkShell.bufferedReader = Some(in);
-    FlinkShell.main(args)
-
-    baos.flush()
-    val output: String = baos.toString
-    System.setOut(oldOut)
-
-    Assert.assertTrue(output.contains("IntCounter: 2"))
-    Assert.assertTrue(output.contains("foobar"))
-    Assert.assertTrue(output.contains("barfoo"))
-
-    Assert.assertFalse(output.contains("failed"))
-    Assert.assertFalse(output.contains("Error"))
-    Assert.assertFalse(output.contains("ERROR"))
-    Assert.assertFalse(output.contains("Exception"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
----------------------------------------------------------------------
diff --git a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh 
b/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
deleted file mode 100644
index fd85897..0000000
--- a/flink-staging/flink-scala-shell/start-script/start-scala-shell.sh
+++ /dev/null
@@ -1,86 +0,0 @@
-#!/bin/bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# from scala-lang 2.10.4
-
-# restore stty settings (echo in particular)
-function restoreSttySettings() {
-  if [[ -n $SCALA_RUNNER_DEBUG ]]; then
-    echo "restoring stty:"
-    echo "$saved_stty"
-  fi
-  stty $saved_stty
-  saved_stty=""
-}
-
-function onExit() {
-  [[ "$saved_stty" != "" ]] && restoreSttySettings
-  exit $scala_exit_status
-}
-
-
-# to reenable echo if we are interrupted before completing.
-trap onExit INT
-# save terminal settings
-saved_stty=$(stty -g 2>/dev/null)
-# clear on error so we don't later try to restore them
-if [[ ! $? ]]; then
-  saved_stty=""
-fi
-
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-FLINK_CLASSPATH=`constructFlinkClassPath`
-
-# https://issues.scala-lang.org/browse/SI-6502, cant load external jars 
interactively
-# in scala shell since 2.10, has to be done at startup
-# checks arguments for additional classpath and adds it to the "standard 
classpath"
-
-EXTERNAL_LIB_FOUND=false
-for ((i=1;i<=$#;i++))
-do
-    if [[  ${!i} = "-a" || ${!i} = "--addclasspath" ]]
-    then
-       EXTERNAL_LIB_FOUND=true
-       
-        #adding to classpath
-        k=$((i+1))
-        j=$((k+1))
-        echo " "
-        echo "Additional classpath:${!k}"
-        echo " "
-        EXT_CLASSPATH="${!k}"
-        FLINK_CLASSPATH="$FLINK_CLASSPATH:${!k}"
-        set -- "${@:1:$((i-1))}" "${@:j}"
-    fi
-done
-
-if ${EXTERNAL_LIB_FOUND}
-then
-    java -Dscala.color -cp "$FLINK_CLASSPATH" 
org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH"
-else
-    java -Dscala.color -cp "$FLINK_CLASSPATH" 
org.apache.flink.api.scala.FlinkShell $@
-fi
-
-#restore echo
-onExit

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml 
b/flink-staging/flink-table/pom.xml
deleted file mode 100644
index f13f422..0000000
--- a/flink-staging/flink-table/pom.xml
+++ /dev/null
@@ -1,258 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-table</artifactId>
-       <name>flink-table</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-scala</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-scala</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-examples-batch</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-reflect</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-library</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.scala-lang</groupId>
-                       <artifactId>scala-compiler</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.codehaus.janino</groupId>
-                       <artifactId>janino</artifactId>
-                       <version>2.7.5</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>com.fasterxml.jackson.core</groupId>
-                       <artifactId>jackson-databind</artifactId>
-                       <version>${jackson.version}</version>
-               </dependency>
-
-       </dependencies>
-
-       <build>
-               <plugins>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                               scala classes can be resolved 
later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                                       <compilerPlugins 
combine.children="append">
-                                               <compilerPlugin>
-                                                       
<groupId>org.scalamacros</groupId>
-                                                       
<artifactId>paradise_${scala.version}</artifactId>
-                                                       
<version>${scala.macros.version}</version>
-                                               </compilerPlugin>
-                                       </compilerPlugins>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               
<phase>generate-test-sources</phase>
-                                               <goals>
-                                                       
<goal>add-test-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/test/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <version>0.5.0</version>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>check</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <verbose>false</verbose>
-                                       <failOnViolation>true</failOnViolation>
-                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
-                                       <failOnWarning>false</failOnWarning>
-                                       
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-                                       
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                                       
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-                                       <outputEncoding>UTF-8</outputEncoding>
-                               </configuration>
-                       </plugin>
-
-               </plugins>
-       </build>
-
-       <profiles>
-               <profile>
-                       <id>scala-2.10</id>
-                       <activation>
-                               <property>
-                                       <!-- this is the default scala profile 
-->
-                                       <name>!scala-2.11</name>
-                               </property>
-                       </activation>
-                       <dependencies>
-                               <dependency>
-                                       <groupId>org.scalamacros</groupId>
-                                       
<artifactId>quasiquotes_${scala.binary.version}</artifactId>
-                                       
<version>${scala.macros.version}</version>
-                               </dependency>
-                       </dependencies>
-               </profile>
-       </profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
deleted file mode 100644
index 97113bb..0000000
--- 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/java/table/package-info.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <strong>Table API (Java)</strong><br>
- *
- * {@link org.apache.flink.api.java.table.TableEnvironment} can be used to 
create a
- * {@link org.apache.flink.api.table.Table} from a {@link 
org.apache.flink.api.java.DataSet}
- * or {@link org.apache.flink.streaming.api.datastream.DataStream}.
- *
- * <p>
- * This can be used to perform SQL-like queries on data. Please have
- * a look at {@link org.apache.flink.api.table.Table} to see which operations 
are supported and
- * how query strings are written.
- *
- * <p>
- * Example:
- *
- * <pre>{@code
- * ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
- *
- * DataSet<WC> input = env.fromElements(
- *   new WC("Hello", 1),
- *   new WC("Ciao", 1),
- *   new WC("Hello", 1));
- *
- * Table table = TableUtil.from(input);
- *
- * Table filtered = table
- *     .groupBy("word")
- *     .select("word.count as count, word")
- *     .filter("count = 2");
- *
- * DataSet<WC> result = TableUtil.toSet(filtered, WC.class);
- *
- * result.print();
- * env.execute();
- * }</pre>
- *
- * <p>
- * As seen above, a {@link org.apache.flink.api.table.Table} can be converted 
back to the
- * underlying API representation using {@link 
org.apache.flink.api.java.table.TableEnvironment#toDataSet(org.apache.flink.api.table.Table,
 java.lang.Class)}
- * or {@link 
org.apache.flink.api.java.table.TableEnvironment#toDataStream(org.apache.flink.api.table.Table,
 java.lang.Class)}}.
- */
-package org.apache.flink.api.java.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
deleted file mode 100644
index d7fbc8e..0000000
--- 
a/flink-staging/flink-table/src/main/java/org/apache/flink/api/table/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * <strong>Table API</strong><br>
- *
- * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
- * the language specific part of the API. Refer to these packages for 
documentation on how
- * the Table API can be used in Java and Scala.
- */
-package org.apache.flink.api.table;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
 
b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
deleted file mode 100644
index c043508..0000000
--- 
a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.java;
-
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.TableEnvironment;
-
-/**
- * Very simple example that shows how the Java Table API can be used.
- */
-public class JavaTableExample {
-
-       public static class WC {
-               public String word;
-               public int count;
-
-               // Public constructor to make it a Flink POJO
-               public WC() {
-
-               }
-
-               public WC(String word, int count) {
-                       this.word = word;
-                       this.count = count;
-               }
-
-               @Override
-               public String toString() {
-                       return "WC " + word + " " + count;
-               }
-       }
-
-       public static void main(String[] args) throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.createCollectionsEnvironment();
-               TableEnvironment tableEnv = new TableEnvironment();
-
-               DataSet<WC> input = env.fromElements(
-                               new WC("Hello", 1),
-                               new WC("Ciao", 1),
-                               new WC("Hello", 1));
-
-               Table table = tableEnv.fromDataSet(input);
-
-               Table filtered = table
-                               .groupBy("word")
-                               .select("word.count as count, word")
-                               .filter("count = 2");
-
-               DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
-
-               result.print();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
deleted file mode 100644
index 9dc9297..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.table
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
-import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, 
MapOperator, UnsortedGrouping}
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields
-import org.apache.flink.api.table.plan._
-import org.apache.flink.api.table.runtime._
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.{RenameOperator, 
RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.table.{ExpressionException, Row, Table}
-
-/**
- * [[PlanTranslator]] for creating [[Table]]s from Java 
[[org.apache.flink.api.java.DataSet]]s and
- * translating them back to Java [[org.apache.flink.api.java.DataSet]]s.
- */
-class JavaBatchTranslator extends PlanTranslator {
-
-  type Representation[A] = JavaDataSet[A]
-
-  override def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table = {
-
-    val rowDataSet = createSelect(expressions, repr, inputType)
-
-    Table(Root(rowDataSet, resultFields))
-  }
-
-  override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
JavaDataSet[A] = {
-
-    if (tpe.getTypeClass == classOf[Row]) {
-      // shortcut for DataSet[Row]
-      return translateInternal(op).asInstanceOf[JavaDataSet[A]]
-    }
-
-    val clazz = tpe.getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create DataSet of type " +
-        clazz.getName + ". Only top-level classes or static member classes are 
supported.")
-    }
-
-
-    if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) {
-      throw new ExpressionException(
-        "A Table can only be converted to composite types, type is: " +
-          implicitly[TypeInformation[A]] +
-          ". Composite types would be tuples, case classes and POJOs.")
-    }
-
-    val resultSet = translateInternal(op)
-
-    val resultType = resultSet.getType.asInstanceOf[RowTypeInfo]
-
-    val outputType = 
implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]]
-
-    val resultNames = resultType.getFieldNames
-    val outputNames = outputType.getFieldNames.toSeq
-
-    if (resultNames.toSet != outputNames.toSet) {
-      throw new ExpressionException(s"Expression result type $resultType does 
not have the same " +
-        s"fields as output type $outputType")
-    }
-
-    for (f <- outputNames) {
-      val in = resultType.getTypeAt(resultType.getFieldIndex(f))
-      val out = outputType.getTypeAt(outputType.getFieldIndex(f))
-      if (!in.equals(out)) {
-        throw new ExpressionException(s"Types for field $f differ on input 
$resultType and " +
-          s"output $outputType.")
-      }
-    }
-
-    val outputFields = outputNames map {
-      f => ResolvedFieldReference(f, resultType.getTypeAt(f))
-    }
-
-    val function = new ExpressionSelectFunction(
-      resultSet.getType.asInstanceOf[RowTypeInfo],
-      outputType,
-      outputFields)
-
-    val opName = s"select(${outputFields.mkString(",")})"
-    val operator = new MapOperator(resultSet, outputType, function, opName)
-
-    operator
-  }
-
-  private def translateInternal(op: PlanNode): JavaDataSet[Row] = {
-    op match {
-      case Root(dataSet: JavaDataSet[Row], resultFields) =>
-        dataSet
-
-      case Root(_, _) =>
-        throw new ExpressionException("Invalid Root for JavaBatchTranslator: " 
+ op + ". " +
-          "Did you try converting a Table based on a DataSet to a DataStream 
or vice-versa?")
-
-      case GroupBy(_, fields) =>
-        throw new ExpressionException("Dangling GroupBy operation. Did you 
forget a " +
-          "SELECT statement?")
-
-      case As(input, newNames) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val proxyType = new RenamingProxyTypeInfo[Row](inType, 
newNames.toArray)
-        new RenameOperator(translatedInput, proxyType)
-
-      case sel@Select(Filter(Join(leftInput, rightInput), predicate), 
selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedLeftInput = translateInternal(leftInput)
-          val translatedRightInput = translateInternal(rightInput)
-          val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-          val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-          createJoin(
-            predicate,
-            selection,
-            translatedLeftInput,
-            translatedRightInput,
-            leftInType,
-            rightInType,
-            JoinHint.OPTIMIZER_CHOOSES)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case Filter(Join(leftInput, rightInput), predicate) =>
-        val translatedLeftInput = translateInternal(leftInput)
-        val translatedRightInput = translateInternal(rightInput)
-        val leftInType = 
translatedLeftInput.getType.asInstanceOf[CompositeType[Row]]
-        val rightInType = 
translatedRightInput.getType.asInstanceOf[CompositeType[Row]]
-
-        createJoin(
-          predicate,
-          leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) 
++
-            rightInput.outputFields.map( f => ResolvedFieldReference(f._1, 
f._2)),
-          translatedLeftInput,
-          translatedRightInput,
-          leftInType,
-          rightInType,
-          JoinHint.OPTIMIZER_CHOOSES)
-
-      case Join(leftInput, rightInput) =>
-        throw new ExpressionException("Join without filter condition 
encountered. " +
-          "Did you forget to add .where(...) ?")
-
-      case sel@Select(input, selection) =>
-
-        val expandedInput = ExpandAggregations(sel)
-
-        if (expandedInput.eq(sel)) {
-          val translatedInput = input match {
-            case GroupBy(groupByInput, groupExpressions) =>
-              val translatedGroupByInput = translateInternal(groupByInput)
-              val inType = 
translatedGroupByInput.getType.asInstanceOf[CompositeType[Row]]
-
-              val keyIndices = groupExpressions map {
-                case fe: ResolvedFieldReference => 
inType.getFieldIndex(fe.name)
-                case e =>
-                  throw new ExpressionException(s"Expression $e is not a valid 
key expression.")
-              }
-
-              val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, 
false)
-              val grouping = new UnsortedGrouping(translatedGroupByInput, keys)
-
-              new GroupReduceOperator(
-                grouping,
-                inType,
-                new NoExpressionAggregateFunction(),
-                "Nop Expression Aggregation")
-
-            case _ => translateInternal(input)
-          }
-
-          val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-          val inputFields = inType.getFieldNames
-          createSelect(
-            selection,
-            translatedInput,
-            inType)
-        } else {
-          translateInternal(expandedInput)
-        }
-
-      case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val keyIndices = groupExpressions map {
-          case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name)
-          case e => throw new ExpressionException(s"Expression $e is not a 
valid key expression.")
-        }
-
-        val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false)
-
-        val grouping = new UnsortedGrouping(translatedInput, keys)
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          grouping,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-      case agg@Aggregate(input, aggregations) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-
-        val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map {
-          case (fieldName, fun) =>
-            fun.getFactory.createAggregationFunction[Any](
-              
inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass)
-        }
-
-        val aggIndices = aggregations map {
-          case (fieldName, _) =>
-            inType.getFieldIndex(fieldName)
-        }
-
-        val result = new GroupReduceOperator(
-          translatedInput,
-          inType,
-          new ExpressionAggregateFunction(aggIndices, aggFunctions),
-          "Expression Aggregation: " + agg)
-
-        result
-
-
-      case Filter(input, predicate) =>
-        val translatedInput = translateInternal(input)
-        val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]]
-        val filter = new ExpressionFilterFunction[Row](predicate, inType)
-        translatedInput.filter(filter).name(predicate.toString)
-
-      case uni@UnionAll(left, right) =>
-        val translatedLeft = translateInternal(left)
-        val translatedRight = translateInternal(right)
-        translatedLeft.union(translatedRight).name("Union: " + uni)
-    }
-  }
-
-  private def createSelect[I](
-      fields: Seq[Expression],
-      input: JavaDataSet[I],
-      inputType: CompositeType[I]): JavaDataSet[Row] = {
-
-    fields foreach {
-      f =>
-        if (f.exists(_.isInstanceOf[Aggregation])) {
-          throw new ExpressionException("Found aggregate in " + 
fields.mkString(", ") + ".")
-        }
-
-    }
-
-    val resultType = new RowTypeInfo(fields)
-
-    val function = new ExpressionSelectFunction(inputType, resultType, fields)
-
-    val opName = s"select(${fields.mkString(",")})"
-    val operator = new MapOperator(input, resultType, function, opName)
-
-    operator
-  }
-
-  private def createJoin[L, R](
-      predicate: Expression,
-      fields: Seq[Expression],
-      leftInput: JavaDataSet[L],
-      rightInput: JavaDataSet[R],
-      leftType: CompositeType[L],
-      rightType: CompositeType[R],
-      joinHint: JoinHint): JavaDataSet[Row] = {
-
-    val resultType = new RowTypeInfo(fields)
-
-    val (reducedPredicate, leftFields, rightFields) =
-      ExtractEquiJoinFields(leftType, rightType, predicate)
-
-    if (leftFields.isEmpty || rightFields.isEmpty) {
-      throw new ExpressionException("Could not derive equi-join predicates " +
-        "for predicate " + predicate + ".")
-    }
-
-    val leftKey = new ExpressionKeys[L](leftFields, leftType)
-    val rightKey = new ExpressionKeys[R](rightFields, rightType)
-
-    val joiner = new ExpressionJoinFunction[L, R, Row](
-      reducedPredicate,
-      leftType,
-      rightType,
-      resultType,
-      fields)
-
-    new EquiJoin[L, R, Row](
-      leftInput,
-      rightInput,
-      leftKey,
-      rightKey,
-      joiner,
-      resultType,
-      joinHint,
-      predicate.toString)
-  }
-}

Reply via email to