http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
deleted file mode 100644
index b706e6d..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.{Expression, NopExpression}
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a binary operation.
- */
-class GenerateJoin[L, R, O](
-    leftTypeInfo: CompositeType[L],
-    rightTypeInfo: CompositeType[R],
-    resultTypeInfo: CompositeType[O],
-    predicate: Expression,
-    outputFields: Seq[Expression],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
-    Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
-    cl = cl,
-    config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-
-  override protected def generateInternal(): FlatJoinFunction[L, R, O] = {
-
-    val leftTpe = typeTermForTypeInfo(leftTypeInfo)
-    val rightTpe = typeTermForTypeInfo(rightTypeInfo)
-    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-
-
-    val resultCode = createResult(resultTypeInfo, outputFields, o => 
s"coll.collect($o);")
-
-    val generatedName = freshName("GeneratedJoin")
-
-
-    val code = predicate match {
-      case n: NopExpression =>
-        // Janino does not support generics, that's why we need
-        // manual casting here
-        if (nullCheck) {
-          j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          public org.apache.flink.api.table.TableConfig config = null;
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            $resultCode
-          }
-        }
-      """
-        } else {
-          j"""
-        public class $generatedName
-            implements org.apache.flink.api.common.functions.FlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          public org.apache.flink.api.table.TableConfig config = null;
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            $resultCode
-          }
-        }
-      """
-        }
-
-      case _ =>
-        val pred = generateExpression(predicate)
-        // Janino does not support generics, that's why we need
-        // manual casting here
-        if (nullCheck) {
-          j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.FlatFlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            ${pred.code}
-
-            if (${pred.nullTerm} && ${pred.resultTerm}) {
-              $resultCode
-            }
-          }
-        }
-      """
-        } else {
-          j"""
-        public class $generatedName
-            implements org.apache.flink.api.common.functions.FlatJoinFunction {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          public void join(Object _in0, Object _in1, 
org.apache.flink.util.Collector coll) {
-            $leftTpe in0 = ($leftTpe) _in0;
-            $rightTpe in1 = ($rightTpe) _in1;
-
-            ${pred.code}
-
-            if (${pred.resultTerm}) {
-              $resultCode
-            }
-          }
-        }
-      """
-        }
-    }
-
-    LOG.debug(s"""Generated join:\n$code""")
-    compiler.cook(new StringReader(code))
-    val clazz = compiler.getClassLoader().loadClass(generatedName)
-    val constructor = clazz.getConstructor(classOf[TableConfig])
-    constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
deleted file mode 100644
index 3916410..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-
-/**
- * Base class for unary and binary result assembler code generators.
- */
-abstract class GenerateResultAssembler[R](
-    inputs: Seq[(String, CompositeType[_])],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends ExpressionCodeGenerator[R](inputs, cl = cl, config) {
-
-  def reuseCode[A](resultTypeInfo: CompositeType[A]) = {
-      val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-      resultTypeInfo match {
-        case pj: PojoTypeInfo[_] =>
-          super.reuseMemberCode() +
-            s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();"
-
-        case row: RowTypeInfo =>
-          super.reuseMemberCode() +
-            s"org.apache.flink.api.table.Row out =" +
-            s" new org.apache.flink.api.table.Row(${row.getArity});"
-
-        case _ => ""
-      }
-  }
-
-  def createResult[T](
-      resultTypeInfo: CompositeType[T],
-      outputFields: Seq[Expression],
-      result: String => String): String = {
-
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val fieldsCode = outputFields.map(generateExpression)
-
-    val block = resultTypeInfo match {
-      case ri: RowTypeInfo =>
-        val resultSetters: String = fieldsCode.zipWithIndex map {
-          case (fieldCode, i) =>
-            s"""
-              |${fieldCode.code}
-              |out.setField($i, ${fieldCode.resultTerm});
-            """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case pj: PojoTypeInfo[_] =>
-        val resultSetters: String = fieldsCode.zip(outputFields) map {
-        case (fieldCode, expr) =>
-          val fieldName = expr.name
-          s"""
-            |${fieldCode.code}
-            |out.$fieldName = ${fieldCode.resultTerm};
-          """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case tup: TupleTypeInfo[_] =>
-        val resultSetters: String = fieldsCode.zip(outputFields) map {
-          case (fieldCode, expr) =>
-            val fieldName = expr.name
-            s"""
-              |${fieldCode.code}
-              |out.$fieldName = ${fieldCode.resultTerm};
-            """.stripMargin
-        } mkString("\n")
-
-        s"""
-          |$resultSetters
-          |${result("out")}
-        """.stripMargin
-
-      case cc: CaseClassTypeInfo[_] =>
-        val fields: String = fieldsCode.map(_.code).mkString("\n")
-        val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",")
-
-        s"""
-          |$fields
-          |return new $resultType($ctorParams);
-        """.stripMargin
-    }
-
-    block
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
deleted file mode 100644
index a75d15b..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import java.io.StringReader
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.Indenter._
-import org.apache.flink.api.table.expressions.Expression
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a select operation.
- */
-class GenerateSelect[I, O](
-    inputTypeInfo: CompositeType[I],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader,
-    config: TableConfig)
-  extends GenerateResultAssembler[MapFunction[I, O]](
-    Seq(("in0", inputTypeInfo)),
-    cl = cl,
-    config) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  override protected def generateInternal(): MapFunction[I, O] = {
-
-    val inputTpe = typeTermForTypeInfo(inputTypeInfo)
-    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields, o => s"return 
$o;")
-
-    val generatedName = freshName("GeneratedSelect")
-
-    // Janino does not support generics, that's why we need
-    // manual casting here
-    val code =
-      j"""
-        public class $generatedName
-            implements 
org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> {
-
-          ${reuseCode(resultTypeInfo)}
-
-          org.apache.flink.api.table.TableConfig config = null;
-
-          public $generatedName(org.apache.flink.api.table.TableConfig config) 
{
-            this.config = config;
-            ${reuseInitCode()}
-          }
-
-          @Override
-          public Object map(Object _in0) {
-            $inputTpe in0 = ($inputTpe) _in0;
-            $resultCode
-          }
-        }
-      """
-
-    LOG.debug(s"""Generated select:\n$code""")
-    compiler.cook(new StringReader(code))
-    val clazz = compiler.getClassLoader().loadClass(generatedName)
-    val constructor = clazz.getConstructor(classOf[TableConfig])
-    constructor.newInstance(config).asInstanceOf[MapFunction[I, O]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala
new file mode 100644
index 0000000..7c20e38
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class GeneratedExpression(
+    resultTerm: String,
+    nullTerm: String,
+    code: String,
+    resultType: TypeInformation[_])

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala
new file mode 100644
index 0000000..575e7ab
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class GeneratedFunction[T](name: String, returnType: 
TypeInformation[Any], code: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
index 1319f21..c7d9a2e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
@@ -18,16 +18,16 @@
 package org.apache.flink.api.table.codegen
 
 class IndentStringContext(sc: StringContext) {
-  def j(args: Any*):String = {
+  def j(args: Any*): String = {
     val sb = new StringBuilder()
     for ((s, a) <- sc.parts zip args) {
       sb append s
 
       val ind = getindent(s)
-      if (ind.size > 0) {
-        sb append a.toString().replaceAll("\n", "\n" + ind)
+      if (ind.nonEmpty) {
+        sb append a.toString.replaceAll("\n", "\n" + ind)
       } else {
-        sb append a.toString()
+        sb append a.toString
       }
     }
     if (sc.parts.size > args.size) {
@@ -50,5 +50,5 @@ class IndentStringContext(sc: StringContext) {
 }
 
 object Indenter {
-  implicit  def toISC(sc: StringContext) = new IndentStringContext(sc)
+  implicit def toISC(sc: StringContext): IndentStringContext = new 
IndentStringContext(sc)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
new file mode 100644
index 0000000..8402569
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+
+object OperatorCodeGen {
+
+   def generateArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+      (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+    }
+  }
+
+  def generateUnaryArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
+      (operandTerm) => s"$operator($operandTerm)"
+    }
+  }
+
+  def generateEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isReference(left)) {
+        (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
+      }
+      else if (isReference(right)) {
+        (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
+      }
+      else {
+        (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
+      }
+    }
+  }
+
+  def generateNotEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isReference(left)) {
+        (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
+      }
+      else if (isReference(right)) {
+        (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
+      }
+      else {
+        (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
+      }
+    }
+  }
+
+  def generateComparison(
+      operator: String,
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      if (isString(left) && isString(right)) {
+        (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
+      }
+      else if (isNumeric(left) && isNumeric(right)) {
+        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+      else {
+        throw new CodeGenException("Comparison is only supported for Strings 
and numeric types.")
+      }
+    }
+  }
+
+  def generateIsNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand.resultType)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} == null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = false;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = !${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand.resultType)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} != null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = true;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateAnd(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> Unknown
+      // False && Unknown -> False
+      // Unknown && True -> Unknown
+      // Unknown && False -> False
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateOr(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> True
+      // False && Unknown -> Unknown
+      // Unknown && True -> True
+      // Unknown && False -> Unknown
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && 
${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
${right.resultTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && 
!${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateNot(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    // Three-valued logic:
+    // no Unknown -> Two-valued logic
+    // Unknown -> Unknown
+    generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
+      (operandTerm) => s"!($operandTerm)"
+    }
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  private def generateUnaryOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+      (expr: (String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm;
+        |boolean $nullTerm;
+        |if (!${operand.nullTerm}) {
+        |  $resultTerm = ${expr(operand.resultTerm)};
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = $defaultValue;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+  }
+
+  private def generateOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+      (expr: (String, String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val resultCode = if (nullCheck) {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, 
right.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index f909cab..85956a2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -23,8 +23,10 @@ import 
org.apache.flink.api.scala.table.ImplicitExpressionOperations
 
 object Literal {
   def apply(l: Any): Literal = l match {
-    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
+    case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
+    case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
     case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
     case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
     case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index f6fe2e4..227b3e8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -18,13 +18,19 @@
 
 package org.apache.flink.api.table.plan
 
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, 
GenericTypeInfo}
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.java.typeutils.ValueTypeInfo._
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{Row, TableException}
+
+import scala.collection.JavaConversions._
 
 object TypeConverter {
 
@@ -51,7 +57,7 @@ object TypeConverter {
 //    case p: PojoTypeInfo[_] => STRUCTURED
 //    case g: GenericTypeInfo[_] => OTHER
     case _ => ??? // TODO more types
-    }
+  }
 
   def sqlTypeToTypeInfo(sqlType: SqlTypeName): TypeInformation[_] = sqlType 
match {
     case BOOLEAN => BOOLEAN_TYPE_INFO
@@ -63,7 +69,74 @@ object TypeConverter {
     case DOUBLE => DOUBLE_TYPE_INFO
     case VARCHAR | CHAR => STRING_TYPE_INFO
     case DATE => DATE_TYPE_INFO
-    case _ => ??? // TODO more types
+    case _ =>
+      println(sqlType)
+      ??? // TODO more types
+  }
+
+  def determineReturnType(
+      logicalRowType: RelDataType,
+      expectedPhysicalType: Option[TypeInformation[Any]],
+      nullable: Boolean,
+      useEfficientTypes: Boolean)
+    : TypeInformation[Any] = {
+    // convert to type information
+    val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
+      TypeConverter.sqlTypeToTypeInfo(relDataType.getType.getSqlTypeName)
+    }
+
+    val returnType = expectedPhysicalType match {
+      // a certain physical type is expected (but not Row)
+      // check if expected physical type is compatible with logical field type
+      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+        if (typeInfo.getArity != logicalFieldTypes.length) {
+          throw new TableException("Arity of result does not match expected 
type.")
+        }
+        typeInfo match {
+          case ct: CompositeType[_] =>
+            logicalFieldTypes.zipWithIndex foreach {
+              case (fieldTypeInfo, i) =>
+                val expectedTypeInfo = ct.getTypeAt(i)
+                if (fieldTypeInfo != expectedTypeInfo) {
+                  throw new TableException(s"Result field does not match 
expected type." +
+                    s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
+                }
+            }
+          case at: AtomicType[_] =>
+            val fieldTypeInfo = logicalFieldTypes.head
+            if (fieldTypeInfo != at) {
+              throw new TableException(s"Result field does not match expected 
type." +
+                s"Expected: $at; Actual: $fieldTypeInfo")
+            }
+
+          case _ =>
+            throw new TableException("Unsupported result type.")
+        }
+        typeInfo
+
+      // Row is expected, create the arity for it
+      case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
+        new RowTypeInfo(logicalFieldTypes)
+
+      // no physical type
+      // determine type based on logical fields and configuration parameters
+      case None =>
+        // no need for efficient types -> use Row
+        // we cannot use efficient types if row arity > tuple arity or nullable
+        if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY 
|| nullable) {
+          new RowTypeInfo(logicalFieldTypes)
+        }
+        // use efficient type tuple or atomic type
+        else {
+          if (logicalFieldTypes.length == 1) {
+            logicalFieldTypes.head
+          }
+          else {
+            new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*)
+          }
+        }
     }
+    returnType.asInstanceOf[TypeInformation[Any]]
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
index ec5805a..00cf899 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.TableConfig
 
 /**
   * Flink RelNode which matches along with PartitionOperator.
@@ -57,7 +58,9 @@ class DataSetExchange(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig, 
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
index 913cca0..9744792 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelTraitSet, 
RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.plan.TypeConverter._
 
 /**
   * Flink RelNode which matches along with FlatMapOperator.
@@ -35,7 +37,7 @@ class DataSetFlatMap(
     input: RelNode,
     rowType: RelDataType,
     opName: String,
-    func: FlatMapFunction[Row, Row])
+    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => 
FlatMapFunction[Any, Any])
   extends SingleRel(cluster, traitSet, input)
   with DataSetRel {
 
@@ -56,7 +58,17 @@ class DataSetFlatMap(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
-    ???
+  override def toString = opName
+
+  override def translateToPlan(config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+    val flatMapFunc = func.apply(config, inputDataSet.getType, returnType)
+    inputDataSet.flatMap(flatMapFunc)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index 11bb160..ae76d29 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -22,8 +22,9 @@ import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
 import org.apache.flink.api.common.functions.GroupReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableConfig, Row}
 
 /**
   * Flink RelNode which matches along with ReduceGroupOperator.
@@ -57,7 +58,9 @@ class DataSetGroupReduce(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
index c20cdc5..de436be 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
@@ -23,9 +23,10 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
 import org.apache.flink.api.common.functions.JoinFunction
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableConfig, Row}
 
 /**
   * Flink RelNode which matches along with JoinOperator and its related 
operations.
@@ -67,7 +68,9 @@ class DataSetJoin(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
index be8bd9d..f4f8afb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.plan.TypeConverter.determineReturnType
 
 /**
   * Flink RelNode which matches along with MapOperator.
@@ -34,7 +36,7 @@ class DataSetMap(
     input: RelNode,
     rowType: RelDataType,
     opName: String,
-    func: MapFunction[Row, Row])
+    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => 
MapFunction[Any, Any])
   extends SingleRel(cluster, traitSet, input)
   with DataSetRel {
 
@@ -55,9 +57,19 @@ class DataSetMap(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def toString() = opName
+  override def toString = opName
 
-  override def translateToPlan: DataSet[Any] = {
-    ???
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]])
+    : DataSet[Any] = {
+    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val returnType = determineReturnType(
+      getRowType,
+      expectedType,
+      config.getNullCheck,
+      config.getEfficientTypeUsage)
+    val mapFunc = func.apply(config, inputDataSet.getType, returnType)
+    inputDataSet.map(mapFunc)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
index 567a91c..e6fc0f9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala
@@ -22,8 +22,9 @@ import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
 import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableConfig, Row}
 
 /**
   * Flink RelNode which matches along with ReduceOperator.
@@ -57,7 +58,9 @@ class DataSetReduce(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 20677b3..16a0ae3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,15 +19,19 @@
 package org.apache.flink.api.table.plan.nodes.dataset
 
 import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.TableConfig
 
 trait DataSetRel extends RelNode {
 
   /**
     * Translate the FlinkRelNode into Flink operator.
     */
-  def translateToPlan: DataSet[Any]
+  def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]] = None)
+    : DataSet[Any]
 
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
index df5301d..033711b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.TableConfig
 
 /**
   * Flink RelNode which matches along with SortPartitionOperator.
@@ -56,7 +57,9 @@ class DataSetSort(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
index 53067dc..33fe430 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala
@@ -18,25 +18,18 @@
 
 package org.apache.flink.api.table.plan.nodes.dataset
 
-import java.lang.reflect.Field
-
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType}
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.plan.TypeConverter
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.plan.TypeConverter.determineReturnType
 import org.apache.flink.api.table.plan.schema.DataSetTable
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.configuration.Configuration
-
-import scala.collection.JavaConverters._
+import org.apache.flink.api.table.runtime.MapRunner
 
 /**
   * Flink RelNode which matches along with DataSource.
@@ -62,132 +55,57 @@ class DataSetSource(
     )
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]])
+    : DataSet[Any] = {
 
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-
-    // extract Flink data types
-    val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
-      .map(f => f.getType.getSqlTypeName)
-      .map(n => TypeConverter.sqlTypeToTypeInfo(n))
-      .toArray
-
-    val rowTypeInfo = new RowTypeInfo(fieldTypes, dataSetTable.fieldNames)
-
-    // convert input data set into row data set
-    inputDataSet.getType match {
-      case t: TupleTypeInfo[_] =>
-        val rowMapper = new TupleToRowMapper(dataSetTable.fieldIndexes)
-        inputDataSet.asInstanceOf[DataSet[Tuple]]
-          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
-
-      case c: CaseClassTypeInfo[_] =>
-        val rowMapper = new CaseClassToRowMapper(dataSetTable.fieldIndexes)
-        inputDataSet.asInstanceOf[DataSet[Product]]
-          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
-
-      case p: PojoTypeInfo[_] =>
-        // get pojo class
-        val typeClazz = p.getTypeClass.asInstanceOf[Class[Any]]
-        // get original field names
-        val origFieldNames = dataSetTable.fieldIndexes.map(i => 
p.getFieldNames()(i))
-
-        val rowMapper = new PojoToRowMapper(typeClazz, origFieldNames)
-        inputDataSet.asInstanceOf[DataSet[Any]]
-          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
-
-      case a: AtomicType[_] =>
-        val rowMapper = new AtomicToRowMapper
-        inputDataSet.asInstanceOf[DataSet[Any]]
-          .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]]
-    }
-  }
-
-}
-
-class TupleToRowMapper(val fromIndexes: Array[Int])
-  extends RichMapFunction[Tuple, Row]
-{
-
-  @transient var outR: Row = null
-
-  override def open(conf: Configuration): Unit = {
-    outR = new Row(fromIndexes.length)
-  }
-
-  override def map(v: Tuple): Row = {
-
-    var i = 0
-    while (i < fromIndexes.length) {
-      outR.setField(i, v.getField(fromIndexes(i)))
-      i += 1
-    }
-    outR
-  }
-}
-
-class CaseClassToRowMapper(val fromIndexes: Array[Int])
-  extends RichMapFunction[Product, Row]
-{
-
-  @transient var outR: Row = null
-
-  override def open(conf: Configuration): Unit = {
-    outR = new Row(fromIndexes.length)
-  }
-
-  override def map(v: Product): Row = {
-
-    var i = 0
-    while (i < fromIndexes.length) {
-      outR.setField(i, v.productElement(fromIndexes(i)))
-      i += 1
-    }
-    outR
-  }
-}
-
-class PojoToRowMapper(val inClazz: Class[Any], val fieldNames: Array[String])
-  extends RichMapFunction[Any, Row]
-{
-
-  @transient var outR: Row = null
-  @transient var fields: Array[Field] = null
-
-  override def open(conf: Configuration): Unit = {
-
-    fields = fieldNames.map { n =>
-      val f = inClazz.getField(n)
-      f.setAccessible(true)
-      f
-    }
-    outR = new Row(fieldNames.length)
-  }
-
-  override def map(v: Any): Row = {
-
-    var i = 0
-    while (i < fields.length) {
-      outR.setField(i, fields(i).get(v))
-      i += 1
+    val inputType = inputDataSet.getType
+
+    // special case:
+    // if efficient type usage is enabled and no expected type is set
+    // we can simply forward the DataSet to the next operator
+    expectedType match {
+      case None if config.getEfficientTypeUsage =>
+        inputDataSet
+
+      case _ =>
+        val determinedType = determineReturnType(
+          getRowType,
+          expectedType,
+          config.getNullCheck,
+          config.getEfficientTypeUsage)
+
+        // conversion
+        if (determinedType != inputType) {
+          val generator = new CodeGenerator(config, inputDataSet.getType)
+          val conversion = 
generator.generateConverterResultExpression(determinedType)
+
+          val body =
+            s"""
+              |${conversion.code}
+              |return ${conversion.resultTerm};
+              |""".stripMargin
+
+          val genFunction = generator.generateFunction(
+            "DataSetSourceConversion",
+            classOf[MapFunction[Any, Any]],
+            body,
+            determinedType)
+
+          val mapFunc = new MapRunner[Any, Any](
+            genFunction.name,
+            genFunction.code,
+            genFunction.returnType)
+
+          inputDataSet.map(mapFunc)
+        }
+        // no conversion necessary, forward
+        else {
+          inputDataSet
+        }
     }
-    outR
-  }
-}
-
-class AtomicToRowMapper()
-  extends RichMapFunction[Any, Row]
-{
-
-  @transient var outR: Row = null
-
-  override def open(conf: Configuration): Unit = {
-    outR = new Row(1)
   }
 
-  override def map(v: Any): Row = {
-
-    outR.setField(0, v)
-    outR
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
index a510fc9..ebfd48a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
@@ -21,8 +21,9 @@ package org.apache.flink.api.table.plan.nodes.dataset
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.{TableConfig, Row}
 
 /**
 * Flink RelNode which matches along with UnionOperator.
@@ -55,7 +56,9 @@ class DataSetUnion(
     super.explainTerms(pw).item("name", opName)
   }
 
-  override def translateToPlan: DataSet[Any] = {
+  override def translateToPlan(
+      config: TableConfig,
+      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     ???
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
index 383c965..0ca153d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala
@@ -21,8 +21,13 @@ package org.apache.flink.api.table.plan.rules.dataset
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetFlatMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, 
FlinkConvention}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, 
FlinkFilter}
+import org.apache.flink.api.table.runtime.FlatMapRunner
 
 class DataSetFilterRule
   extends ConverterRule(
@@ -37,13 +42,54 @@ class DataSetFilterRule
     val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
     val convInput: RelNode = RelOptRule.convert(filter.getInput, 
DataSetConvention.INSTANCE)
 
+    val func = (
+        config: TableConfig,
+        inputType: TypeInformation[Any],
+        returnType: TypeInformation[Any]) => {
+      val generator = new CodeGenerator(config, inputType)
+
+      val condition = generator.generateExpression(filter.getCondition)
+
+      // conversion
+      val body = if (inputType != returnType) {
+        val conversion = 
generator.generateConverterResultExpression(returnType)
+        s"""
+          |${condition.code}
+          |if (${condition.resultTerm}) {
+          |  ${conversion.code}
+          |  ${generator.collectorTerm}.collect(${conversion.resultTerm});
+          |}
+          |""".stripMargin
+      }
+      // no conversion
+      else {
+        s"""
+          |${condition.code}
+          |if (${condition.resultTerm}) {
+          |  ${generator.collectorTerm}.collect(${generator.input1Term});
+          |}
+          |""".stripMargin
+      }
+
+      val genFunction = generator.generateFunction(
+        description,
+        classOf[FlatMapFunction[Any, Any]],
+        body,
+        returnType)
+
+      new FlatMapRunner[Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType)
+    }
+
     new DataSetFlatMap(
       rel.getCluster,
       traitSet,
       convInput,
       rel.getRowType,
       filter.toString,
-      null)
+      func)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
index 7796d66..d747ba9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala
@@ -21,8 +21,15 @@ package org.apache.flink.api.table.plan.rules.dataset
 import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, 
DataSetMap}
-import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, 
FlinkConvention}
+import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, 
FlinkProject}
+import org.apache.flink.api.table.runtime.MapRunner
+
+import scala.collection.JavaConversions._
 
 class DataSetProjectRule
   extends ConverterRule(
@@ -37,13 +44,40 @@ class DataSetProjectRule
     val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataSetConvention.INSTANCE)
     val convInput: RelNode = RelOptRule.convert(proj.getInput, 
DataSetConvention.INSTANCE)
 
+    val func = (
+        config: TableConfig,
+        inputType: TypeInformation[Any],
+        returnType: TypeInformation[Any]) => {
+      val generator = new CodeGenerator(config, inputType)
+
+      // projection and implicit type conversion
+      val projection = generator.generateResultExpression(returnType, 
proj.getProjects)
+
+      val body =
+        s"""
+          |${projection.code}
+          |return ${projection.resultTerm};
+          |""".stripMargin
+
+      val genFunction = generator.generateFunction(
+        description,
+        classOf[MapFunction[Any, Any]],
+        body,
+        returnType)
+
+      new MapRunner[Any, Any](
+        genFunction.name,
+        genFunction.code,
+        genFunction.returnType)
+    }
+
     new DataSetMap(
       rel.getCluster,
       traitSet,
       convInput,
       rel.getRowType,
       proj.toString,
-      null)
+      func)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
deleted file mode 100644
index 38afc21..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.common.functions.{GroupReduceFunction, 
GroupCombineFunction, RichGroupReduceFunction}
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-class ExpressionAggregateFunction(
-    private val fieldPositions: Seq[Int],
-    private val functions: Seq[AggregationFunction[Any]])
-  extends RichGroupReduceFunction[Row, Row]
-  with GroupCombineFunction[Row, Row]
-{
-
-  override def open(conf: Configuration): Unit = {
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      functions(i).initializeAggregate()
-      i += 1
-    }
-  }
-
-  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-
-    val fieldPositions = this.fieldPositions
-    val functions = this.functions
-
-    var current: Row = null
-
-    val values = in.iterator()
-    while (values.hasNext) {
-      current = values.next()
-
-      var i = 0
-      val len = functions.length
-      while (i < len) {
-        functions(i).aggregate(current.productElement(fieldPositions(i)))
-        i += 1
-      }
-    }
-
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      current.setField(fieldPositions(i), functions(i).getAggregate)
-      functions(i).initializeAggregate()
-      i += 1
-    }
-
-    out.collect(current)
-  }
-
-  override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-    reduce(in, out)
-  }
-
-}
-
-
-class NoExpressionAggregateFunction()
-  extends GroupReduceFunction[Row, Row]
-  with GroupCombineFunction[Row, Row]
-{
-
-  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-
-    var first: Row = null
-
-    val values = in.iterator()
-    if (values.hasNext) {
-      first = values.next()
-    }
-
-    out.collect(first)
-  }
-
-  override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-    reduce(in, out)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
deleted file mode 100644
index 4e50272..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.GenerateFilter
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.configuration.Configuration
-
-/**
- * Proxy function that takes an expression predicate. This is compiled
- * upon runtime and calls to [[filter()]] are forwarded to the compiled code.
- */
-class ExpressionFilterFunction[T](
-    predicate: Expression,
-    inputType: CompositeType[T],
-    config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] {
-
-  var compiledFilter: FilterFunction[T] = null
-
-  override def open(c: Configuration): Unit = {
-    if (compiledFilter == null) {
-      val codegen = new GenerateFilter[T](
-        inputType,
-        predicate,
-        getRuntimeContext.getUserCodeClassLoader,
-        config)
-      compiledFilter = codegen.generate()
-    }
-  }
-
-  override def filter(in: T) = compiledFilter.filter(in)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
deleted file mode 100644
index cf2c90f..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatJoinFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.GenerateJoin
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-/**
- * Proxy function that takes an expression predicate and output fields. These 
are compiled
- * upon runtime and calls to [[join()]] are forwarded to the compiled code.
- */
-class ExpressionJoinFunction[L, R, O](
-    predicate: Expression,
-    leftType: CompositeType[L],
-    rightType: CompositeType[R],
-    resultType: CompositeType[O],
-    outputFields: Seq[Expression],
-    config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, 
R, O] {
-
-  var compiledJoin: FlatJoinFunction[L, R, O] = null
-
-  override def open(c: Configuration): Unit = {
-    val codegen = new GenerateJoin[L, R, O](
-      leftType,
-      rightType,
-      resultType,
-      predicate,
-      outputFields,
-      getRuntimeContext.getUserCodeClassLoader,
-      config)
-    compiledJoin = codegen.generate()
-  }
-
-  def join(left: L, right: R, out: Collector[O]) = {
-    compiledJoin.join(left, right, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
deleted file mode 100644
index ab7adb1..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.codegen.GenerateSelect
-import org.apache.flink.configuration.Configuration
-
-/**
- * Proxy function that takes expressions. These are compiled
- * upon runtime and calls to [[map()]] are forwarded to the compiled code.
- */
-class ExpressionSelectFunction[I, O](
-     inputType: CompositeType[I],
-     resultType: CompositeType[O],
-     outputFields: Seq[Expression],
-     config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] {
-
-  var compiledSelect: MapFunction[I, O] = null
-
-  override def open(c: Configuration): Unit = {
-
-    if (compiledSelect == null) {
-      val resultCodegen = new GenerateSelect[I, O](
-        inputType,
-        resultType,
-        outputFields,
-        getRuntimeContext.getUserCodeClassLoader,
-        config)
-
-      compiledSelect = resultCodegen.generate()
-    }
-  }
-
-  def map(in: I): O = {
-    compiledSelect.map(in)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
new file mode 100644
index 0000000..8a3482f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class FlatMapRunner[IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichFlatMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[FlatMapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: FlatMapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating FlatMapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def flatMap(in: IN, out: Collector[OUT]): Unit =
+    function.flatMap(in, out)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
new file mode 100644
index 0000000..9cd44d1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.Function
+import org.codehaus.commons.compiler.CompileException
+import org.codehaus.janino.SimpleCompiler
+
+trait FunctionCompiler[T <: Function] {
+
+  @throws(classOf[CompileException])
+  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
+    require(cl != null, "Classloader must not be null.")
+    val compiler = new SimpleCompiler()
+    compiler.setParentClassLoader(cl)
+    compiler.cook(code)
+    compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
new file mode 100644
index 0000000..f64635b
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.slf4j.LoggerFactory
+
+class MapRunner[IN, OUT](
+    name: String,
+    code: String,
+    @transient returnType: TypeInformation[OUT])
+  extends RichMapFunction[IN, OUT]
+  with ResultTypeQueryable[OUT]
+  with FunctionCompiler[MapFunction[IN, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[IN, OUT] = null
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: IN): OUT =
+    function.map(in)
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
deleted file mode 100644
index a1bc4b7..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * The functions in this package are used transforming Table API operations to 
Java API operations.
- */
-package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
deleted file mode 100644
index 3b5459b..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.operators.SingleInputOperator
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for 
renaming some
- * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At 
runtime this
- * disappears since the translation methods simply returns the input.
- */
-class RenameOperator[T](
-    input: JavaDataSet[T],
-    renamingTypeInformation: RenamingProxyTypeInfo[T])
-  extends SingleInputOperator[T, T, RenameOperator[T]](input, 
renamingTypeInformation) {
-
-  override protected def translateToDataFlow(
-      input: Operator[T]): Operator[T] = input
-}

Reply via email to