http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
deleted file mode 100644
index b706e6d..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.{Expression, NopExpression}
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a binary operation.
- */
-class GenerateJoin[L, R, O](
-    leftTypeInfo: CompositeType[L],
-    rightTypeInfo: CompositeType[R],
-    resultTypeInfo: CompositeType[O],
-    predicate: Expression,
-    outputFields: Seq[Expression],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
-    Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
-    cl = cl,
-    config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-
-  override protected def generateInternal(): FlatJoinFunction[L, R, O] = {
-
-    val leftTpe = typeTermForTypeInfo(leftTypeInfo)
-    val rightTpe = typeTermForTypeInfo(rightTypeInfo)
-    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-
-
-    val resultCode = createResult(resultTypeInfo, outputFields, o => 
s"coll.collect($o);")
-
-    val generatedName = freshName("GeneratedJoin")
-
-
-    val code = predicate match {
-      case n: NopExpression =>
-        // Janino does not support generics, that's why we need
-        // manual casting here
-        if (nullCheck) {
-          j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          public org.apache.flink.api.table.TableConfig config = null;
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            $resultCode
-          }
-        }
-      """
-        } else {
-          j"""
-        public class $generatedName
-            implements org.apache.flink.api.common.functions.FlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          public org.apache.flink.api.table.TableConfig config = null;
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            $resultCode
-          }
-        }
-      """
-        }
-
-      case _ =>
-        val pred = generateExpression(predicate)
-        // Janino does not support generics, that's why we need
-        // manual casting here
-        if (nullCheck) {
-          j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            ${pred.code}
-
-            if (${pred.nullTerm} && ${pred.resultTerm}) {
-              $resultCode
-            }
-          }
-        }
-      """
-        } else {
-          j"""
-        public class $generatedName
-            implements org.apache.flink.api.common.functions.FlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            ${pred.code}
-
-            if (${pred.resultTerm}) {
-              $resultCode
-            }
-          }
-        }
-      """
-        }
-    }
-
-    LOG.debug(s"""Generated join:\n$code""")
-    compiler.cook(new StringReader(code))
-    val clazz = compiler.getClassLoader().loadClass(generatedName)
-    val constructor = clazz.getConstructor(classOf[TableConfig])
-    constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
deleted file mode 100644
index 3916410..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-
-/**
- * Base class for unary and binary result assembler code generators.
- */
-abstract class GenerateResultAssembler[R](
-    inputs: Seq[(String, CompositeType[_])],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends ExpressionCodeGenerator[R](inputs, cl = cl, config) {
-
-  def reuseCode[A](resultTypeInfo: CompositeType[A]) = {
-      val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-      resultTypeInfo match {
-        case pj: PojoTypeInfo[_] =>
-          super.reuseMemberCode() +
-            s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();"
-
-        case row: RowTypeInfo =>
-          super.reuseMemberCode() +
-            s"org.apache.flink.api.table.Row out =" +
-            s" new org.apache.flink.api.table.Row(${row.getArity});"
-
-        case _ => ""
-      }
-  }
-
-  def createResult[T](
-      resultTypeInfo: CompositeType[T],
-      outputFields: Seq[Expression],
-      result: String => String): String = {
-
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val fieldsCode = outputFields.map(generateExpression)
-
-    val block = resultTypeInfo match {
-      case ri: RowTypeInfo =>
-        val resultSetters: String = fieldsCode.zipWithIndex map {
-          case (fieldCode, i) =>
-            s"""
-              |${fieldCode.code}
-              |out.setField($i, ${fieldCode.resultTerm});
-            """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case pj: PojoTypeInfo[_] =>
-        val resultSetters: String = fieldsCode.zip(outputFields) map {
-        case (fieldCode, expr) =>
-          val fieldName = expr.name
-          s"""
-            |${fieldCode.code}
-            |out.$fieldName = ${fieldCode.resultTerm};
-          """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case tup: TupleTypeInfo[_] =>
-        val resultSetters: String = fieldsCode.zip(outputFields) map {
-          case (fieldCode, expr) =>
-            val fieldName = expr.name
-            s"""
-              |${fieldCode.code}
-              |out.$fieldName = ${fieldCode.resultTerm};
-            """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case cc: CaseClassTypeInfo[_] =>
-        val fields: String = fieldsCode.map(_.code).mkString("\n")
-        val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",")
-
-        s"""
-          |$fields
-          |return new $resultType($ctorParams);
-        """.stripMargin
-    }
-
-    block
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
deleted file mode 100644
index a75d15b..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.Expression
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a select operation.
- */
-class GenerateSelect[I, O](
-    inputTypeInfo: CompositeType[I],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends GenerateResultAssembler[MapFunction[I, O]](
-    Seq(("in0", inputTypeInfo)),
-    cl = cl,
-    config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  override protected def generateInternal(): MapFunction[I, O] = {
-
-    val inputTpe = typeTermForTypeInfo(inputTypeInfo)
-    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields, o => s"return 
$o;")
-
-    val generatedName = freshName("GeneratedSelect")
-
-    // Janino does not support generics, that's why we need
-    // manual casting here
-    val code =
-      j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          @Override
-          public Object map(Object _in0) {
-            $inputTpe in0 = ($inputTpe) _in0;
-            $resultCode
-          }
-        }
-      """
-
-    LOG.debug(s"""Generated select:\n$code""")
-    compiler.cook(new StringReader(code))
-    val clazz = compiler.getClassLoader().loadClass(generatedName)
-    val constructor = clazz.getConstructor(classOf[TableConfig])
-    constructor.newInstance(config).asInstanceOf[MapFunction[I, O]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
deleted file mode 100644
index 1319f21..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-class IndentStringContext(sc: StringContext) {
-  def j(args: Any*):String = {
-    val sb = new StringBuilder()
-    for ((s, a) <- sc.parts zip args) {
-      sb append s
-
-      val ind = getindent(s)
-      if (ind.size > 0) {
-        sb append a.toString().replaceAll("\n", "\n" + ind)
-      } else {
-        sb append a.toString()
-      }
-    }
-    if (sc.parts.size > args.size) {
-      sb append sc.parts.last
-    }
-
-    sb.toString()
-  }
-
-  // get white indent after the last new line, if any
-  def getindent(str: String): String = {
-    val lastnl = str.lastIndexOf("\n")
-    if (lastnl == -1) ""
-    else {
-      val ind = str.substring(lastnl + 1)
-      if (ind.trim.isEmpty) ind  // ind is all whitespace. Use this
-      else ""
-    }
-  }
-}
-
-object Indenter {
-  implicit  def toISC(sc: StringContext) = new IndentStringContext(sc)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
deleted file mode 100644
index b69ac1c..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-package object codegen {
-  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not 
thread safe. We might
-  // have several parallel expression operators in one TaskManager, therefore 
we need to guard
-  // these operations.
-  object ReflectionLock
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
deleted file mode 100644
index 9152260..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/Node.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.explain;
-
-import java.util.List;
-
-public class Node {
-       private int id;
-       private String type;
-       private String pact;
-       private String contents;
-       private int parallelism;
-       private String driver_strategy;
-       private List<Predecessors> predecessors;
-       private List<Global_properties> global_properties;
-       private List<LocalProperty> local_properties;
-       private List<Estimates> estimates;
-       private List<Costs> costs;
-       private List<Compiler_hints> compiler_hints;
-
-       public int getId() {
-               return id;
-       }
-       public String getType() {
-               return type;
-       }
-       public String getPact() {
-               return pact;
-       }
-       public String getContents() {
-               return contents;
-       }
-       public int getParallelism() {
-               return parallelism;
-       }
-       public String getDriver_strategy() {
-               return driver_strategy;
-       }
-       public List<Predecessors> getPredecessors() {
-               return predecessors;
-       }
-       public List<Global_properties> getGlobal_properties() {
-               return global_properties;
-       }
-       public List<LocalProperty> getLocal_properties() {
-               return local_properties;
-       }
-       public List<Estimates> getEstimates() {
-               return estimates;
-       }
-       public List<Costs> getCosts() {
-               return costs;
-       }
-       public List<Compiler_hints> getCompiler_hints() {
-               return compiler_hints;
-       }
-}
-
-class Predecessors {
-       private String ship_strategy;
-       private String exchange_mode;
-
-       public String getShip_strategy() {
-               return ship_strategy;
-       }
-       public String getExchange_mode() {
-               return exchange_mode;
-       }
-}
-
-class Global_properties {
-       private String name;
-       private String value;
-
-       public String getValue() {
-               return value;
-       }
-       public String getName() {
-               return name;
-       }
-}
-
-class LocalProperty {
-       private String name;
-       private String value;
-
-       public String getValue() {
-               return value;
-       }
-       public String getName() {
-               return name;
-       }
-}
-
-class Estimates {
-       private String name;
-       private String value;
-
-       public String getValue() {
-               return value;
-       }
-       public String getName() {
-               return name;
-       }
-}
-
-class Costs {
-       private String name;
-       private String value;
-
-       public String getValue() {
-               return value;
-       }
-       public String getName() {
-               return name;
-       }
-}
-
-class Compiler_hints {
-       private String name;
-       private String value;
-
-       public String getValue() {
-               return value;
-       }
-       public String getName() {
-               return name;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
deleted file mode 100644
index 31a7cd68..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/explain/PlanJsonParser.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.explain;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.LinkedHashMap;
-import java.util.List;
-
-public class PlanJsonParser {
-
-       public static String getSqlExecutionPlan(String t, Boolean extended) 
throws Exception {
-               ObjectMapper objectMapper = new ObjectMapper();
-
-               //not every node is same, ignore the unknown field
-               
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-
-               PlanTree tree = objectMapper.readValue(t, PlanTree.class);
-               LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
-               StringWriter sw = new StringWriter();
-               PrintWriter pw = new PrintWriter(sw);
-               int tabCount = 0;
-
-               for (int index = 0; index < tree.getNodes().size(); index++) {
-                       Node tempNode = tree.getNodes().get(index);
-
-                       //input with operation such as join or union is 
coordinate, keep the same indent 
-                       if ((tempNode.getPact().equals("Data Source")) && 
(map.containsKey(tempNode.getPact()))) {
-                               tabCount = map.get(tempNode.getPact());
-                       }
-                       else {
-                               map.put(tempNode.getPact(), tabCount);
-                       }
-
-                       printTab(tabCount, pw);
-                       pw.print("Stage " + tempNode.getId() + " : " + 
tempNode.getPact() + "\n");
-
-                       printTab(tabCount + 1, pw);
-                       String content = tempNode.getContents();
-
-                       //drop the hashcode of object instance
-                       int dele = tempNode.getContents().indexOf("@");
-                       if (dele > -1) content = 
tempNode.getContents().substring(0, dele);
-                       
-                       //replace with certain content if node is dataSource to 
pass
-                       //unit tests, because java and scala use different api 
to
-                       //get input element
-                       if (tempNode.getPact().equals("Data Source"))
-                               content = "collect elements with 
CollectionInputFormat";
-                       pw.print("content : " + content + "\n");
-
-                       List<Predecessors> predecessors = 
tempNode.getPredecessors();
-                       if (predecessors != null) {
-                               printTab(tabCount + 1, pw);
-                               pw.print("ship_strategy : " + 
predecessors.get(0).getShip_strategy() + "\n");
-
-                               printTab(tabCount + 1, pw);
-                               pw.print("exchange_mode : " + 
predecessors.get(0).getExchange_mode() + "\n");
-                       }
-
-                       if (tempNode.getDriver_strategy() != null) {
-                               printTab(tabCount + 1, pw);
-                               pw.print("driver_strategy : " + 
tempNode.getDriver_strategy() + "\n");
-                       }
-
-                       printTab(tabCount + 1, pw);
-                       
pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
-                                       + 
tempNode.getGlobal_properties().get(0).getValue() + "\n");
-
-                       if (extended) {
-                               List<Global_properties> globalProperties = 
tempNode.getGlobal_properties();
-                               for (int i = 1; i < globalProperties.size(); 
i++) {
-                                       printTab(tabCount + 1, pw);
-                                       
pw.print(globalProperties.get(i).getName() + " : "
-                                       + globalProperties.get(i).getValue() + 
"\n");
-                               }
-
-                               List<LocalProperty> localProperties = 
tempNode.getLocal_properties();
-                               for (int i = 0; i < localProperties.size(); 
i++) {
-                                       printTab(tabCount + 1, pw);
-                                       
pw.print(localProperties.get(i).getName() + " : "
-                                       + localProperties.get(i).getValue() + 
"\n");
-                               }
-
-                               List<Estimates> estimates = 
tempNode.getEstimates();
-                               for (int i = 0; i < estimates.size(); i++) {
-                                       printTab(tabCount + 1, pw);
-                                       pw.print(estimates.get(i).getName() + " 
: "
-                                       + estimates.get(i).getValue() + "\n");
-                               }
-
-                               List<Costs> costs = tempNode.getCosts();
-                               for (int i = 0; i < costs.size(); i++) {
-                                       printTab(tabCount + 1, pw);
-                                       pw.print(costs.get(i).getName() + " : "
-                                       + costs.get(i).getValue() + "\n");
-                               }
-
-                               List<Compiler_hints> compilerHintses = 
tempNode.getCompiler_hints();
-                               for (int i = 0; i < compilerHintses.size(); 
i++) {
-                                       printTab(tabCount + 1, pw);
-                                       
pw.print(compilerHintses.get(i).getName() + " : "
-                                       + compilerHintses.get(i).getValue() + 
"\n");
-                               }
-                       }
-                       tabCount++;
-                       pw.print("\n");
-               }
-               pw.close();
-               return sw.toString();
-       }
-
-       private static void printTab(int tabCount, PrintWriter pw) {
-               for (int i = 0; i < tabCount; i++)
-                       pw.print("\t");
-       }
-}
-
-class PlanTree {
-       private List<Node> nodes;
-
-       public List<Node> getNodes() {
-               return nodes;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
deleted file mode 100644
index 900ed8a..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.language.postfixOps
-
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation}
-import org.apache.flink.api.table.trees.TreeNode
-
-
-abstract class Expression extends TreeNode[Expression] { self: Product =>
-  def name: String = Expression.freshName("expression")
-  def typeInfo: TypeInformation[_]
-}
-
-abstract class BinaryExpression() extends Expression { self: Product =>
-  def left: Expression
-  def right: Expression
-  def children = Seq(left, right)
-}
-
-abstract class UnaryExpression() extends Expression { self: Product =>
-  def child: Expression
-  def children = Seq(child)
-}
-
-abstract class LeafExpression() extends Expression { self: Product =>
-  val children = Nil
-}
-
-case class NopExpression() extends LeafExpression {
-  val typeInfo = new NothingTypeInfo()
-  override val name = Expression.freshName("nop")
-
-}
-
-object Expression {
-  def freshName(prefix: String): String = {
-    s"$prefix-${freshNameCounter.getAndIncrement}"
-  }
-
-  val freshNameCounter = new AtomicInteger
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
deleted file mode 100644
index 08e319d..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.aggregation.Aggregations
-
-
-abstract sealed class Aggregation extends UnaryExpression { self: Product =>
-  def typeInfo = {
-    child.typeInfo match {
-      case BasicTypeInfo.LONG_TYPE_INFO => // ok
-      case BasicTypeInfo.INT_TYPE_INFO =>
-      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
-      case BasicTypeInfo.FLOAT_TYPE_INFO =>
-      case BasicTypeInfo.BYTE_TYPE_INFO =>
-      case BasicTypeInfo.SHORT_TYPE_INFO =>
-      case _ =>
-      throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " 
+
-        s"aggregation $this. Only numeric data types supported.")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"Aggregate($child)"
-
-  def getIntermediateFields: Seq[Expression]
-  def getFinalField(inputs: Seq[Expression]): Expression
-  def getAggregations: Seq[Aggregations]
-}
-
-case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"($child).sum"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-}
-
-case class Min(child: Expression) extends Aggregation {
-  override def toString = s"($child).min"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MIN)
-
-}
-
-case class Max(child: Expression) extends Aggregation {
-  override def toString = s"($child).max"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child)
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.MAX)
-}
-
-case class Count(child: Expression) extends Aggregation {
-  override def typeInfo = {
-    child.typeInfo match {
-      case _ => // we can count anything... :D
-    }
-    BasicTypeInfo.INT_TYPE_INFO
-  }
-
-  override def toString = s"($child).count"
-
-  override def getIntermediateFields: Seq[Expression] = 
Seq(Literal(Integer.valueOf(1)))
-  override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0)
-  override def getAggregations = Seq(Aggregations.SUM)
-
-}
-
-case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"($child).avg"
-
-  override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1))
-  // This is just sweet. Use our own AST representation and let the code 
generator do
-  // our dirty work.
-  override def getFinalField(inputs: Seq[Expression]): Expression =
-    Div(inputs(0), inputs(1))
-  override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
deleted file mode 100644
index 797de55..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.common.typeutils.CompositeType
-
-import scala.collection.mutable
-
-/**
- * Equi-join field extractor for Join Predicates and CoGroup predicates. The 
result is a modified
- * expression without the equi-join predicates together with indices of the 
join fields
- * from both the left and right input.
- */
-object ExtractEquiJoinFields {
-  def apply(leftType: CompositeType[_], rightType: CompositeType[_], 
predicate: Expression) = {
-
-    val joinFieldsLeft = mutable.MutableList[Int]()
-    val joinFieldsRight = mutable.MutableList[Int]()
-
-    val equiJoinExprs = mutable.MutableList[EqualTo]()
-    // First get all `===` expressions that are not below an `Or`
-    predicate.transformPre {
-      case or@Or(_, _) => NopExpression()
-      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) 
=>
-        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(le.name)
-          joinFieldsRight += rightType.getFieldIndex(re.name)
-        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
-          joinFieldsLeft += leftType.getFieldIndex(re.name)
-          joinFieldsRight += rightType.getFieldIndex(le.name)
-        } else {
-          // not an equi-join predicate
-        }
-        equiJoinExprs += eq
-        eq
-    }
-
-    // then remove the equi join expressions from the predicate
-    val resultExpr = predicate.transformPost {
-      // For OR, we can eliminate the OR since the equi join
-      // predicate is evaluated before the expression is evaluated
-      case or@Or(NopExpression(), _) => NopExpression()
-      case or@Or(_, NopExpression()) => NopExpression()
-      // For AND we replace it with the other expression, since the
-      // equi join predicate will always be true
-      case and@And(NopExpression(), other) => other
-      case and@And(other, NopExpression()) => other
-      case eq : EqualTo if equiJoinExprs.contains(eq) =>
-        NopExpression()
-    }
-
-    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
deleted file mode 100644
index 6c7ecb2..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{ResolvedFieldReference, 
Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.{Rule, Analyzer}
-
-
-/**
- * Analyzer for grouping expressions. Only field expressions are allowed as 
grouping expressions.
- */
-class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-
-  def rules = Seq(new ResolveFieldReferences(inputFields), 
CheckGroupExpression)
-
-  object CheckGroupExpression extends Rule[Expression] {
-
-    def apply(expr: Expression) = {
-      val errors = mutable.MutableList[String]()
-
-      expr match {
-        case f: ResolvedFieldReference => // this is OK
-        case other =>
-          throw new ExpressionException(
-            s"""Invalid grouping expression "$expr". Only field references are 
allowed.""")
-      }
-      expr
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
deleted file mode 100644
index 0fdcab6..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * [[Rule]] that adds casts in arithmetic operations.
- */
-class InsertAutoCasts extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val result = expr.transformPost {
-
-      case plus@Plus(o1, o2) =>
-        // Plus is special case since we can cast anything to String for 
String concat
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(Cast(o1, o2.typeInfo), o2)
-          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            Plus(o1, Cast(o2, o1.typeInfo))
-          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
-          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
-            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
-          } else {
-            plus
-          }
-        } else {
-          plus
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
-        ba.isInstanceOf[BinaryComparison] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && 
o2.typeInfo.isBasicType) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-
-      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
-        val o1 = ba.left
-        val o2 = ba.right
-        if (o1.typeInfo != o2.typeInfo && 
o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
-          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
-          } else if 
(o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].shouldAutocastTo(
-            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
-            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
-          } else {
-            ba
-          }
-        } else {
-          ba
-        }
-    }
-
-    result
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
deleted file mode 100644
index e9236f7..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Analyzer
-
-/**
- * Analyzer for predicates, i.e. filter operations and where clauses of joins.
- */
-class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new InsertAutoCasts,
-    new TypeCheck,
-    new VerifyNoAggregates,
-    new VerifyBoolean)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
deleted file mode 100644
index db7ea6c..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.{ResolvedFieldReference,
-UnresolvedFieldReference, Expression}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table._
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that resolved field references. This rule verifies that field 
references point to existing
- * fields of the input operation and creates [[ResolvedFieldReference]]s that 
hold the field
- * [[TypeInformation]] in addition to the field name.
- */
-class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
-  extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPost {
-      case fe@UnresolvedFieldReference(fieldName) =>
-        inputFields.find { _._1 == fieldName } match {
-          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
-
-          case None =>
-            errors +=
-              s"Field '$fieldName' is not valid for input fields 
${inputFields.mkString(",")}"
-            fe
-        }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
deleted file mode 100644
index 625fdbf..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Analyzer
-
-/**
- * This analyzes selection expressions.
- */
-class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
-  extends Analyzer[Expression] {
-
-  def rules = Seq(
-    new ResolveFieldReferences(inputFields),
-    new VerifyNoNestedAggregates,
-    new InsertAutoCasts,
-    new TypeCheck)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
deleted file mode 100644
index b724561..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.Rule
-import org.apache.flink.api.table.{_}
-
-import scala.collection.mutable
-
-/**
- * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] 
at least once.
- * Expressions are expected to perform type verification in this method.
- */
-class TypeCheck extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case expr: Expression=> {
-        // simply get the typeInfo from the expression. this will perform type 
analysis
-        try {
-          expr.typeInfo
-        } catch {
-          case e: ExpressionException =>
-            errors += e.getMessage
-        }
-        expr
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
deleted file mode 100644
index e75dd20..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.expressions.{NopExpression, Expression}
-import org.apache.flink.api.table.trees.Rule
-import org.apache.flink.api.table.{_}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-import scala.collection.mutable
-
-/**
- * [[Rule]] that verifies that the result type of an [[Expression]] is 
Boolean. This is required
- * for filter/join predicates.
- */
-class VerifyBoolean extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != 
BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Expression $expr of type 
${expr.typeInfo} is not boolean.")
-    }
-
-    expr
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
deleted file mode 100644
index 09dbf88..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.expressions.{Aggregation, Expression}
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that verifies that an expression does not contain aggregate 
operations. Right now, join
- * predicates and filter predicates cannot contain aggregates.
- */
-class VerifyNoAggregates extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case agg: Aggregation=> {
-        errors +=
-          s"""Aggregations are not allowed in join/filter predicates."""
-        agg
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
deleted file mode 100644
index 07acf1e..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions.analysis
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.expressions.{Expression, Aggregation}
-
-import scala.collection.mutable
-
-import org.apache.flink.api.table.trees.Rule
-
-/**
- * Rule that verifies that an expression does not contain aggregate operations
- * as children of aggregate operations.
- */
-class VerifyNoNestedAggregates extends Rule[Expression] {
-
-  def apply(expr: Expression) = {
-    val errors = mutable.MutableList[String]()
-
-    val result = expr.transformPre {
-      case agg: Aggregation=> {
-        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
-          errors += s"""Found nested aggregation inside "$agg"."""
-        }
-        agg
-      }
-    }
-
-    if (errors.length > 0) {
-      throw new ExpressionException(
-        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
-    }
-
-    result
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
deleted file mode 100644
index e866ea0..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, 
NumericTypeInfo, TypeInformation}
-
-abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in 
$this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type 
${left.typeInfo} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
-      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
-      throw new ExpressionException(s"Non-numeric operand type 
${right.typeInfo} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    left.typeInfo
-  }
-
-  override def toString = s"($left + $right)"
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    child.typeInfo
-  }
-
-  override def toString = s"-($child)"
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic 
{
-  override def toString = s"($left - $right)"
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left / $right)"
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-}
-
-case class Abs(child: Expression) extends UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"abs($child)"
-}
-
-abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: 
Product =>
-  def typeInfo: TypeInformation[_] = {
-    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
-    }
-    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand "${right}" of type ${right.typeInfo} in 
$this""")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      left.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-}
-
-case class BitwiseAnd(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
-  override def toString = s"($left & $right)"
-}
-
-case class BitwiseOr(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
-  override def toString = s"($left | $right)"
-}
-
-
-case class BitwiseXor(left: Expression, right: Expression) extends 
BitwiseBinaryArithmetic {
-  override def toString = s"($left ^ $right)"
-}
-
-case class BitwiseNot(child: Expression) extends UnaryExpression {
-  def typeInfo: TypeInformation[_] = {
-    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
-    }
-    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-      child.typeInfo
-    } else {
-      BasicTypeInfo.INT_TYPE_INFO
-    }
-  }
-
-  override def toString = s"~($child)"
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
deleted file mode 100644
index 9fae862..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.ExpressionException
-
-case class Cast(child: Expression, tpe: TypeInformation[_]) extends 
UnaryExpression {
-  def typeInfo = tpe match {
-    case BasicTypeInfo.STRING_TYPE_INFO => tpe
-
-    case b if b.isBasicType && child.typeInfo.isBasicType => tpe
-
-    case _ => throw new ExpressionException(
-      s"Invalid cast: $this. Casts are only valid betwixt primitive types.")
-  }
-
-  override def toString = s"$child.cast($tpe)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
deleted file mode 100644
index 687ea7a..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
-
-abstract class BinaryComparison extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
-    }
-    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
-      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
-    }
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left === $right"
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def typeInfo = {
-    if (left.typeInfo != right.typeInfo) {
-      throw new ExpressionException(s"Differing operand data types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"$left !== $right"
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def toString = s"$left > $right"
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def toString = s"$left >= $right"
-}
-
-case class LessThan(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def toString = s"$left < $right"
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends 
BinaryComparison {
-  override def toString = s"$left <= $right"
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNull"
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override def toString = s"($child).isNotNull"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
deleted file mode 100644
index a649aed..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-case class UnresolvedFieldReference(override val name: String) extends 
LeafExpression {
-  def typeInfo = throw new ExpressionException(s"Unresolved field reference: 
$this")
-
-  override def toString = "\"" + name
-}
-
-case class ResolvedFieldReference(
-    override val name: String,
-    tpe: TypeInformation[_]) extends LeafExpression {
-  def typeInfo = tpe
-
-  override def toString = s"'$name"
-}
-
-case class Naming(child: Expression, override val name: String) extends 
UnaryExpression {
-  def typeInfo = child.typeInfo
-
-  override def toString = s"$child as '$name"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
deleted file mode 100644
index f909cab..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import java.util.Date
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala.table.ImplicitExpressionOperations
-
-object Literal {
-  def apply(l: Any): Literal = l match {
-    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
-    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
-    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
-    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
-    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
-    case date: Date => Literal(date, BasicTypeInfo.DATE_TYPE_INFO)
-  }
-}
-
-case class Literal(value: Any, tpe: TypeInformation[_])
-  extends LeafExpression with ImplicitExpressionOperations {
-  def expr = this
-  def typeInfo = tpe
-
-  override def toString = s"$value"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
deleted file mode 100644
index eaf0463..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-
-abstract class BinaryPredicate extends BinaryExpression { self: Product =>
-  def typeInfo = {
-    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
-      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand types 
${left.typeInfo} and " +
-        s"${right.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
-  def typeInfo = {
-    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      throw new ExpressionException(s"Non-boolean operand type 
${child.typeInfo} in $this")
-    }
-    BasicTypeInfo.BOOLEAN_TYPE_INFO
-  }
-
-  override val name = Expression.freshName("not-" + child.name)
-
-  override def toString = s"!($child)"
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left && $right"
-
-  override val name = Expression.freshName(left.name + "-and-" + right.name)
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left || $right"
-
-  override val name = Expression.freshName(left.name + "-or-" + right.name)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
deleted file mode 100644
index c5c8c94..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This package contains the base class of AST nodes and all the expression 
language AST classes.
- * Expression trees should not be manually constructed by users. They are 
implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. 
For the Java API,
- * expression trees should be generated from a string parser that parses 
expressions and creates
- * AST nodes.
- */
-package object expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
deleted file mode 100644
index a39d601..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
-    str: Expression,
-    beginIndex: Expression,
-    endIndex: Expression) extends Expression {
-  def typeInfo = {
-    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
-      throw new ExpressionException(
-        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
-    }
-    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
-    }
-    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""End index must be an integer type in $this, is 
${endIndex.typeInfo}.""")
-    }
-
-    BasicTypeInfo.STRING_TYPE_INFO
-  }
-
-  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
-  override def toString = s"($str).substring($beginIndex, $endIndex)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
deleted file mode 100644
index bdcb22c..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api
-
-/**
- * == Table API ==
- *
- * This package contains the generic part of the Table API. It can be used 
with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a 
[[org.apache.flink.api.table.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A 
table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.api.scala.table]] and 
[[org.apache.flink.api.java.table]] contain
- * the language specific part of the API. Refer to these packages for 
documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object table

Reply via email to