[FLINK-7337] [table] Efficient handling of rowtime timestamps

Use Long instead of a SQL Timestamp to represent timestamps internally

This closes #4532.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47944b1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47944b1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47944b1b

Branch: refs/heads/master
Commit: 47944b1bb23136ae498971b3765a0d3fe6bf2f18
Parents: 93d0ae4
Author: twalthr <twal...@apache.org>
Authored: Sat Aug 12 13:51:42 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed Aug 23 10:09:21 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/BatchTableEnvironment.scala |  36 +-
 .../table/api/StreamTableEnvironment.scala      | 245 +++++------
 .../flink/table/api/TableEnvironment.scala      |  73 ++--
 .../calcite/RelTimeIndicatorConverter.scala     |   2 +-
 .../codegen/AggregationCodeGenerator.scala      |  25 +-
 .../flink/table/codegen/CodeGenUtils.scala      |  29 +-
 .../flink/table/codegen/CodeGenerator.scala     |  21 +-
 .../table/codegen/calls/ScalarOperators.scala   |   7 +-
 .../table/functions/ProctimeSqlFunction.scala   |  41 --
 .../functions/sql/ProctimeSqlFunction.scala     |  43 ++
 .../utils/UserDefinedFunctionUtils.scala        |   6 +-
 .../DataStreamGroupWindowAggregate.scala        |   4 +-
 .../table/runtime/CRowInputMapRunner.scala      |  57 ---
 .../runtime/CRowInputTupleOutputMapRunner.scala |  94 ----
 .../flink/table/runtime/CRowMapRunner.scala     |  57 +++
 .../runtime/OutputRowtimeProcessFunction.scala  |  58 +++
 .../table/runtime/RowtimeProcessFunction.scala  |  49 +++
 .../TimestampSetterProcessFunction.scala        |  52 ---
 ...WrappingTimestampSetterProcessFunction.scala |  61 ---
 .../table/runtime/aggregate/AggregateUtil.scala |  33 +-
 .../aggregate/RowTimeBoundedRangeOver.scala     |   4 +-
 .../aggregate/RowTimeBoundedRowsOver.scala      |  13 +-
 .../aggregate/RowTimeSortProcessFunction.scala  |  15 +-
 .../aggregate/RowTimeUnboundedOver.scala        |  17 +-
 .../aggregate/TimeWindowPropertyCollector.scala |   2 +-
 .../conversion/CRowToJavaTupleMapFunction.scala |  40 ++
 .../conversion/CRowToJavaTupleMapRunner.scala   |  64 +++
 .../conversion/CRowToRowMapFunction.scala       |  32 ++
 .../CRowToScalaTupleMapFunction.scala           |  33 ++
 .../conversion/CRowToScalaTupleMapRunner.scala  |  56 +++
 .../table/typeutils/TimeIndicatorTypeInfo.scala |  13 +-
 .../runtime/harness/OverWindowHarnessTest.scala | 436 +++++++++----------
 .../SortProcessFunctionHarnessTest.scala        |  49 +--
 .../runtime/stream/TimeAttributesITCase.scala   |  10 +-
 .../runtime/stream/table/TableSinkITCase.scala  |   4 +-
 35 files changed, 952 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 78667a2..a9d60dd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -153,28 +153,22 @@ abstract class BatchTableEnvironment(
       physicalTypeInfo: TypeInformation[IN],
       schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-    Option[MapFunction[IN, OUT]] = {
-
-    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
-      // Row to Row, no conversion needed
-      None
-    } else {
-      // some type that is neither Row or CRow
-
-      val converterFunction = generateRowConverterFunction[OUT](
-        physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
-        schema,
-        requestedTypeInfo,
-        functionName
-      )
-
-      val mapFunction = new MapRunner[IN, OUT](
-        converterFunction.name,
-        converterFunction.code,
-        converterFunction.returnType)
+      functionName: String)
+    : Option[MapFunction[IN, OUT]] = {
+
+    val converterFunction = generateRowConverterFunction[OUT](
+      physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
+      schema,
+      requestedTypeInfo,
+      functionName
+    )
 
-      Some(mapFunction)
+    // add a runner if we need conversion
+    converterFunction.map { func =>
+      new MapRunner[IN, OUT](
+          func.name,
+          func.code,
+          func.returnType)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index c4e1450..8d8cebb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,15 +23,15 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.plan.hep.HepMatchOrder
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, 
RelDataTypeFieldImpl, RelRecordType}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, 
RelDataTypeFieldImpl, RelRecordType}
 import org.apache.calcite.sql2rel.RelDecorrelator
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, 
TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -44,12 +44,12 @@ import 
org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetr
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, 
StreamTableSourceTable}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
+import org.apache.flink.table.runtime.conversion._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, 
CRowInputMapRunner, CRowInputScalaTupleOutputMapRunner, 
WrappingTimestampSetterProcessFunction}
+import org.apache.flink.table.runtime.{CRowMapRunner, 
OutputRowtimeProcessFunction}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, TableSink, UpsertStreamTableSink}
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, 
StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
-import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
 
@@ -223,38 +223,33 @@ abstract class StreamTableEnvironment(
   /**
     * Creates a final converter that maps the internal row type to external 
type.
     *
-    * @param physicalTypeInfo the input of the sink
+    * @param inputTypeInfo the input of the sink
     * @param schema the input schema with correct field names (esp. for POJO 
field mapping)
     * @param requestedTypeInfo the output type of the sink
     * @param functionName name of the map function. Must not be unique but has 
to be a
     *                     valid Java class identifier.
     */
-  protected def getConversionMapper[IN, OUT](
-      physicalTypeInfo: TypeInformation[IN],
+  protected def getConversionMapper[OUT](
+      inputTypeInfo: TypeInformation[CRow],
       schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-    MapFunction[IN, OUT] = {
-
-    if (requestedTypeInfo.getTypeClass == classOf[Row]) {
-      // CRow to Row, only needs to be unwrapped
-      new MapFunction[CRow, Row] {
-        override def map(value: CRow): Row = value.row
-      }.asInstanceOf[MapFunction[IN, OUT]]
-    } else {
-      // Some type that is neither CRow nor Row
-      val converterFunction = generateRowConverterFunction[OUT](
-        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
-        schema,
-        requestedTypeInfo,
-        functionName
-      )
+      functionName: String)
+    : MapFunction[CRow, OUT] = {
+
+    val converterFunction = generateRowConverterFunction[OUT](
+      inputTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+      schema,
+      requestedTypeInfo,
+      functionName
+    )
 
-      new CRowInputMapRunner[OUT](
-        converterFunction.name,
-        converterFunction.code,
-        converterFunction.returnType)
-        .asInstanceOf[MapFunction[IN, OUT]]
+    converterFunction match {
+
+      case Some(func) =>
+        new CRowMapRunner[OUT](func.name, func.code, func.returnType)
+
+      case _ =>
+        new CRowToRowMapFunction().asInstanceOf[MapFunction[CRow, OUT]]
     }
   }
 
@@ -271,71 +266,62 @@ abstract class StreamTableEnvironment(
       physicalTypeInfo: TypeInformation[CRow],
       schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-    MapFunction[CRow, OUT] = {
-
-    requestedTypeInfo match {
-
-      // Scala tuple
-      case t: CaseClassTypeInfo[_]
-        if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == 
Types.BOOLEAN =>
-
-        val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
-        if (reqType.getTypeClass == classOf[Row]) {
-          // Requested type is Row. Just rewrap CRow in Tuple2
-          new MapFunction[CRow, (Boolean, Row)] {
-            override def map(cRow: CRow): (Boolean, Row) = {
-              (cRow.change, cRow.row)
-            }
-          }.asInstanceOf[MapFunction[CRow, OUT]]
-        } else {
-          // Use a map function to convert Row into requested type and wrap 
result in Tuple2
-          val converterFunction = generateRowConverterFunction(
-            physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
-            schema,
-            reqType,
-            functionName
-          )
-
-          new CRowInputScalaTupleOutputMapRunner(
-            converterFunction.name,
-            converterFunction.code,
-            requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]])
-            .asInstanceOf[MapFunction[CRow, OUT]]
+      functionName: String)
+    : MapFunction[CRow, OUT] = requestedTypeInfo match {
 
-        }
+    // Scala tuple
+    case t: CaseClassTypeInfo[_]
+      if t.getTypeClass == classOf[(_, _)] && t.getTypeAt(0) == Types.BOOLEAN 
=>
 
-      // Java tuple
-      case t: TupleTypeInfo[_]
-        if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == 
Types.BOOLEAN =>
-
-        val reqType = t.getTypeAt(1).asInstanceOf[TypeInformation[Any]]
-        if (reqType.getTypeClass == classOf[Row]) {
-          // Requested type is Row. Just rewrap CRow in Tuple2
-          new MapFunction[CRow, JTuple2[JBool, Row]] {
-            val outT = new JTuple2(true.asInstanceOf[JBool], 
null.asInstanceOf[Row])
-            override def map(cRow: CRow): JTuple2[JBool, Row] = {
-              outT.f0 = cRow.change
-              outT.f1 = cRow.row
-              outT
-            }
-          }.asInstanceOf[MapFunction[CRow, OUT]]
-        } else {
-          // Use a map function to convert Row into requested type and wrap 
result in Tuple2
-          val converterFunction = generateRowConverterFunction(
-            physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
-            schema,
-            reqType,
-            functionName
-          )
-
-          new CRowInputJavaTupleOutputMapRunner(
-            converterFunction.name,
-            converterFunction.code,
-            requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, 
Any]]])
-            .asInstanceOf[MapFunction[CRow, OUT]]
-        }
-    }
+      val reqType = t.getTypeAt[Any](1)
+
+      // convert Row into requested type and wrap result in Tuple2
+      val converterFunction = generateRowConverterFunction(
+        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+        schema,
+        reqType,
+        functionName
+      )
+
+      converterFunction match {
+
+        case Some(func) =>
+          new CRowToScalaTupleMapRunner(
+            func.name,
+            func.code,
+            requestedTypeInfo.asInstanceOf[TypeInformation[(Boolean, Any)]]
+          ).asInstanceOf[MapFunction[CRow, OUT]]
+
+        case _ =>
+          new CRowToScalaTupleMapFunction().asInstanceOf[MapFunction[CRow, 
OUT]]
+      }
+
+    // Java tuple
+    case t: TupleTypeInfo[_]
+      if t.getTypeClass == classOf[JTuple2[_, _]] && t.getTypeAt(0) == 
Types.BOOLEAN =>
+
+      val reqType = t.getTypeAt[Any](1)
+
+      // convert Row into requested type and wrap result in Tuple2
+      val converterFunction = generateRowConverterFunction(
+        physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+        schema,
+        reqType,
+        functionName
+      )
+
+      converterFunction match {
+
+        case Some(func) =>
+          new CRowToJavaTupleMapRunner(
+            func.name,
+            func.code,
+            requestedTypeInfo.asInstanceOf[TypeInformation[JTuple2[JBool, 
Any]]]
+          ).asInstanceOf[MapFunction[CRow, OUT]]
+
+        case _ =>
+          new CRowToJavaTupleMapFunction().asInstanceOf[MapFunction[CRow, OUT]]
+      }
   }
 
   /**
@@ -733,16 +719,42 @@ abstract class StreamTableEnvironment(
     // get CRow plan
     val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
 
+    val rowtimeFields = logicalType
+      .getFieldList.asScala
+      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+    // convert the input type for the conversion mapper
+    // the input will be changed in the OutputRowtimeProcessFunction later
+    val convType = if (rowtimeFields.size > 1) {
+      throw new TableException(
+        s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+          s"the table that should be converted to a DataStream.\n" +
+          s"Please select the rowtime field that should be used as event-time 
timestamp for the " +
+          s"DataStream by casting all other fields to TIMESTAMP.")
+    } else if (rowtimeFields.size == 1) {
+      val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType
+      val convFieldTypes = origRowType.getFieldTypes.map { t =>
+        if (FlinkTypeFactory.isRowtimeIndicatorType(t)) {
+          SqlTimeTypeInfo.TIMESTAMP
+        } else {
+          t
+        }
+      }
+      CRowTypeInfo(new RowTypeInfo(convFieldTypes, origRowType.getFieldNames))
+    } else {
+      plan.getType
+    }
+
     // convert CRow to output type
-    val conversion = if (withChangeFlag) {
+    val conversion: MapFunction[CRow, A] = if (withChangeFlag) {
       getConversionMapperWithChanges(
-        plan.getType,
+        convType,
         new RowSchema(logicalType),
         tpe,
         "DataStreamSinkConversion")
     } else {
       getConversionMapper(
-        plan.getType,
+        convType,
         new RowSchema(logicalType),
         tpe,
         "DataStreamSinkConversion")
@@ -750,42 +762,19 @@ abstract class StreamTableEnvironment(
 
     val rootParallelism = plan.getParallelism
 
-    val rowtimeFields = logicalType.getFieldList.asScala
-      .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
-
-    if (rowtimeFields.isEmpty) {
+    val withRowtime = if (rowtimeFields.isEmpty) {
       // no rowtime field to set
-      conversion match {
-        case mapFunction: MapFunction[CRow, A] =>
-          plan.map(mapFunction)
-            .returns(tpe)
-            .name(s"to: ${tpe.getTypeClass.getSimpleName}")
-            .setParallelism(rootParallelism)
-      }
-    } else if (rowtimeFields.size == 1) {
-      // set the only rowtime field as event-time timestamp for DataStream
-      val mapFunction = conversion match {
-        case mapFunction: MapFunction[CRow, A] => mapFunction
-        case _ => new MapFunction[CRow, A] {
-          override def map(cRow: CRow): A = cRow.asInstanceOf[A]
-        }
-      }
-
-      plan.process(
-        new WrappingTimestampSetterProcessFunction[A](
-          mapFunction,
-          rowtimeFields.head.getIndex))
-        .returns(tpe)
-        .name(s"to: ${tpe.getTypeClass.getSimpleName}")
-        .setParallelism(rootParallelism)
-
+      plan.map(conversion)
     } else {
-      throw new TableException(
-        s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
-          s"the table that should be converted to a DataStream.\n" +
-          s"Please select the rowtime field that should be used as event-time 
timestamp for the " +
-          s"DataStream by casting all other fields to TIMESTAMP.")
+      // set the only rowtime field as event-time timestamp for DataStream
+      // and convert it to SQL timestamp
+      plan.process(new OutputRowtimeProcessFunction[A](conversion, 
rowtimeFields.head.getIndex))
     }
+
+    withRowtime
+      .returns(tpe)
+      .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+      .setParallelism(rootParallelism)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index b647c51..2e9e18f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -38,7 +38,7 @@ import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils._
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
@@ -48,26 +48,23 @@ import 
org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
ExpressionReducer, GeneratedFunction}
-import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.expressions._
-import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.codegen.{ExpressionReducer, 
FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.expressions.{Alias, Expression, 
UnresolvedFieldReference, _}
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation,
 checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _}
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 
-import _root_.scala.collection.JavaConverters._
-import _root_.scala.collection.mutable.HashMap
 import _root_.scala.annotation.varargs
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -108,10 +105,10 @@ abstract class TableEnvironment(val config: TableConfig) {
   private[flink] val attrNameCntr: AtomicInteger = new AtomicInteger(0)
 
   // registered external catalog names -> catalog
-  private val externalCatalogs = new HashMap[String, ExternalCatalog]
+  private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
   /** Returns the table config to define the runtime behavior of the Table 
API. */
-  def getConfig = config
+  def getConfig: TableConfig = config
 
   /**
     * Returns the operator table for this environment including a custom 
Calcite configuration.
@@ -692,7 +689,7 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
         }
-      case r: RowTypeInfo => {
+      case r: RowTypeInfo =>
         exprs.zipWithIndex flatMap {
           case (UnresolvedFieldReference(name), idx) =>
             Some((idx, name))
@@ -707,8 +704,7 @@ abstract class TableEnvironment(val config: TableConfig) {
           case _ => throw new TableException(
             "Field reference expression or alias on field expression 
expected.")
         }
-        
-      }
+
       case tpe => throw new TableException(
         s"Source of type $tpe cannot be converted into Table.")
     }
@@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) {
       throw new TableException("Field name can not be '*'.")
     }
 
-    (fieldNames.toArray, fieldIndexes.toArray)
+    (fieldNames.toArray, fieldIndexes.toArray) // build fails in Scala 2.10 if 
not converted
   }
 
   protected def generateRowConverterFunction[OUT](
       inputTypeInfo: TypeInformation[Row],
       schema: RowSchema,
       requestedTypeInfo: TypeInformation[OUT],
-      functionName: String):
-    GeneratedFunction[MapFunction[Row, OUT], OUT] = {
+      functionName: String)
+    : Option[GeneratedFunction[MapFunction[Row, OUT], OUT]] = {
 
     // validate that at least the field types of physical and logical type 
match
     // we do that here to make sure that plan translation was correct
     if (schema.typeInfo != inputTypeInfo) {
       throw TableException(
         s"The field types of physical and logical row types do not match. " +
-        s"Physical type is [${schema.typeInfo}], Logical type is 
[${inputTypeInfo}]. " +
+        s"Physical type is [${schema.typeInfo}], Logical type is 
[$inputTypeInfo]. " +
         s"This is a bug and should not happen. Please file an issue.")
     }
 
+    // generic row needs no conversion
+    if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
+        requestedTypeInfo.getTypeClass == classOf[Row]) {
+      return None
+    }
+
     val fieldTypes = schema.fieldTypeInfos
     val fieldNames = schema.fieldNames
 
-    // validate requested type
+    // check for valid type info
     if (requestedTypeInfo.getArity != fieldTypes.length) {
       throw new TableException(
-        s"Arity[${fieldTypes.length}] of result[${fieldTypes}] does not match 
" +
-        s"the number[${requestedTypeInfo.getArity}] of requested 
type[${requestedTypeInfo}].")
+        s"Arity [${fieldTypes.length}] of result [$fieldTypes] does not match 
" +
+        s"the number[${requestedTypeInfo.getArity}] of requested type 
[$requestedTypeInfo].")
+    }
+
+    // check requested types
+
+    def validateFieldType(fieldType: TypeInformation[_]): Unit = fieldType 
match {
+      case _: TimeIndicatorTypeInfo =>
+        throw new TableException("The time indicator type is an internal type 
only.")
+      case _ => // ok
     }
 
     requestedTypeInfo match {
@@ -758,9 +768,10 @@ abstract class TableEnvironment(val config: TableConfig) {
               throw new TableException(s"POJO does not define field name: 
$fName")
             }
             val requestedTypeInfo = pt.getTypeAt(pojoIdx)
+            validateFieldType(requestedTypeInfo)
             if (fType != requestedTypeInfo) {
               throw new TableException(s"Result field does not match requested 
type. " +
-                s"requested: $requestedTypeInfo; Actual: $fType")
+                s"Requested: $requestedTypeInfo; Actual: $fType")
             }
         }
 
@@ -769,6 +780,7 @@ abstract class TableEnvironment(val config: TableConfig) {
         fieldTypes.zipWithIndex foreach {
           case (fieldTypeInfo, i) =>
             val requestedTypeInfo = tt.getTypeAt(i)
+            validateFieldType(requestedTypeInfo)
             if (fieldTypeInfo != requestedTypeInfo) {
               throw new TableException(s"Result field does not match requested 
type. " +
                 s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
@@ -781,10 +793,11 @@ abstract class TableEnvironment(val config: TableConfig) {
           throw new TableException(s"Requested result type is an atomic type 
but " +
             s"result[$fieldTypes] has more or less than a single field.")
         }
-        val fieldTypeInfo = fieldTypes.head
-        if (fieldTypeInfo != at) {
+        val requestedTypeInfo = fieldTypes.head
+        validateFieldType(requestedTypeInfo)
+        if (requestedTypeInfo != at) {
           throw new TableException(s"Result field does not match requested 
type. " +
-            s"Requested: $at; Actual: $fieldTypeInfo")
+            s"Requested: $at; Actual: $requestedTypeInfo")
         }
 
       case _ =>
@@ -809,11 +822,13 @@ abstract class TableEnvironment(val config: TableConfig) {
          |return ${conversion.resultTerm};
          |""".stripMargin
 
-    generator.generateFunction(
+    val generated = generator.generateFunction(
       functionName,
       classOf[MapFunction[Row, OUT]],
       body,
       requestedTypeInfo)
+
+    Some(generated)
   }
 }
 
@@ -972,7 +987,7 @@ object TableEnvironment {
     validateType(inputType)
 
     inputType match {
-      case t: CompositeType[_] => 
0.until(t.getArity).map(t.getTypeAt(_)).toArray
+      case t: CompositeType[_] => 0.until(t.getArity).map(i => 
t.getTypeAt(i)).toArray
       case a: AtomicType[_] => Array(a.asInstanceOf[TypeInformation[_]])
       case tpe =>
         throw new TableException(s"Currently only CompositeType and AtomicType 
are supported.")

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 717a1af..1f88737 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
 import 
org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, _}
-import org.apache.flink.table.functions.ProctimeSqlFunction
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
index 680eb44..25527cc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.Indenter.toISC
 import org.apache.flink.table.codegen.CodeGenUtils.newName
 import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getUserDefinedMethod,
 signatureToString}
 import org.apache.flink.table.runtime.aggregate.{GeneratedAggregations, 
SingleElementIterable}
 
@@ -47,7 +48,6 @@ class AggregationCodeGenerator(
     *
     * @param name        Class name of the function.
     *                    Does not need to be unique but has to be a valid Java 
class identifier.
-    * @param generator   The code generator instance
     * @param physicalInputTypes Physical input row types
     * @param aggregates  All aggregate functions
     * @param aggFields   Indexes of the input fields for all aggregate 
functions
@@ -68,7 +68,6 @@ class AggregationCodeGenerator(
     */
   def generateAggregations(
     name: String,
-    generator: CodeGenerator,
     physicalInputTypes: Seq[TypeInformation[_]],
     aggregates: Array[AggregateFunction[_ <: Any, _ <: Any]],
     aggFields: Array[Array[Int]],
@@ -86,28 +85,29 @@ class AggregationCodeGenerator(
     // get unique function name
     val funcName = newName(name)
     // register UDAGGs
-    val aggs = aggregates.map(a => generator.addReusableFunction(a))
+    val aggs = aggregates.map(a => addReusableFunction(a))
     // get java types of accumulators
     val accTypeClasses = aggregates.map { a =>
       a.getClass.getMethod("createAccumulator").getReturnType
     }
     val accTypes = accTypeClasses.map(_.getCanonicalName)
 
-    // get java classes of input fields
-    val javaClasses = physicalInputTypes.map(t => t.getTypeClass)
     // get parameter lists for aggregation functions
-    val parameters = aggFields.map { inFields =>
+    val parametersCode = aggFields.map { inFields =>
       val fields = for (f <- inFields) yield
-        s"(${javaClasses(f).getCanonicalName}) input.getField($f)"
+        s"(${CodeGenUtils.boxedTypeTermForTypeInfo(physicalInputTypes(f))}) 
input.getField($f)"
       fields.mkString(", ")
     }
-    val methodSignaturesList = aggFields.map {
-      inFields => for (f <- inFields) yield javaClasses(f)
+
+    // get method signatures
+    val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
+    val methodSignaturesList = aggFields.map { inFields =>
+      inFields.map(classes(_))
     }
 
     // check and validate the needed methods
     aggregates.zipWithIndex.map {
-      case (a, i) => {
+      case (a, i) =>
         getUserDefinedMethod(a, "accumulate", Array(accTypeClasses(i)) ++ 
methodSignaturesList(i))
         .getOrElse(
           throw new CodeGenException(
@@ -159,7 +159,6 @@ class AggregationCodeGenerator(
                 s"aggregate ${a.getClass.getCanonicalName}'.")
           )
         }
-      }
     }
 
     def genSetAggregationResults: String = {
@@ -208,7 +207,7 @@ class AggregationCodeGenerator(
           j"""
              |    ${aggs(i)}.accumulate(
              |      ((${accTypes(i)}) accs.getField($i)),
-             |      ${parameters(i)});""".stripMargin
+             |      ${parametersCode(i)});""".stripMargin
       }.mkString("\n")
 
       j"""$sig {
@@ -229,7 +228,7 @@ class AggregationCodeGenerator(
           j"""
              |    ${aggs(i)}.retract(
              |      ((${accTypes(i)}) accs.getField($i)),
-             |      ${parameters(i)});""".stripMargin
+             |      ${parametersCode(i)});""".stripMargin
       }.mkString("\n")
 
       if (needRetract) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index 1d8c926..161f9a3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,7 @@ import 
org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, 
TimeIntervalTypeInfo, TypeCheckUtils}
 
 object CodeGenUtils {
 
@@ -90,6 +90,9 @@ object CodeGenUtils {
     case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
     case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
 
+    // time indicators are represented as Long even if they seem to be 
Timestamp
+    case _: TimeIndicatorTypeInfo => "java.lang.Long"
+
     case _ =>
       tpe.getTypeClass.getCanonicalName
   }
@@ -123,8 +126,10 @@ object CodeGenUtils {
   def qualifyEnum(enum: Enum[_]): String =
     enum.getClass.getCanonicalName + "." + enum.name()
 
-  def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: 
String) =
+  def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: 
String): String =
     resultType match {
+      case _: TimeIndicatorTypeInfo =>
+        resultTerm // time indicators are not modified
       case SqlTimeTypeInfo.DATE =>
         s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)"
       case SqlTimeTypeInfo.TIME =>
@@ -133,7 +138,7 @@ object CodeGenUtils {
         
s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)"
     }
 
-  def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: 
String) =
+  def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: 
String): String =
     resultType match {
       case SqlTimeTypeInfo.DATE =>
         s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)"
@@ -157,43 +162,43 @@ object CodeGenUtils {
 
   // 
----------------------------------------------------------------------------------------------
 
-  def requireNumeric(genExpr: GeneratedExpression) =
+  def requireNumeric(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isNumeric(genExpr.resultType)) {
       throw new CodeGenException("Numeric expression type expected, but was " +
         s"'${genExpr.resultType}'.")
     }
 
-  def requireComparable(genExpr: GeneratedExpression) =
+  def requireComparable(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isComparable(genExpr.resultType)) {
       throw new CodeGenException(s"Comparable type expected, but was 
'${genExpr.resultType}'.")
     }
 
-  def requireString(genExpr: GeneratedExpression) =
+  def requireString(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isString(genExpr.resultType)) {
       throw new CodeGenException("String expression type expected.")
     }
 
-  def requireBoolean(genExpr: GeneratedExpression) =
+  def requireBoolean(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isBoolean(genExpr.resultType)) {
       throw new CodeGenException("Boolean expression type expected.")
     }
 
-  def requireTemporal(genExpr: GeneratedExpression) =
+  def requireTemporal(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isTemporal(genExpr.resultType)) {
       throw new CodeGenException("Temporal expression type expected.")
     }
 
-  def requireTimeInterval(genExpr: GeneratedExpression) =
+  def requireTimeInterval(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) {
       throw new CodeGenException("Interval expression type expected.")
     }
 
-  def requireArray(genExpr: GeneratedExpression) =
+  def requireArray(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isArray(genExpr.resultType)) {
       throw new CodeGenException("Array expression type expected.")
     }
 
-  def requireInteger(genExpr: GeneratedExpression) =
+  def requireInteger(genExpr: GeneratedExpression): Unit =
     if (!TypeCheckUtils.isInteger(genExpr.resultType)) {
       throw new CodeGenException("Integer expression type expected.")
     }
@@ -243,7 +248,7 @@ object CodeGenUtils {
         val fieldName = pt.getFieldNames()(index)
         getFieldAccessor(pt.getTypeClass, fieldName)
 
-      case _ => throw new CodeGenException(s"Unsupported composite type: 
'${compType}'")
+      case _ => throw new CodeGenException(s"Unsupported composite type: 
'$compType'")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index be55eac..946c6cd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -39,9 +39,9 @@ import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
 import org.apache.flink.table.codegen.calls.FunctionGenerator
 import org.apache.flink.table.codegen.calls.ScalarOperators._
-import org.apache.flink.table.functions.sql.ScalarSqlFunctions
+import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, 
ScalarSqlFunctions}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-import org.apache.flink.table.functions.{FunctionContext, ProctimeSqlFunction, 
UserDefinedFunction}
+import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 
@@ -328,13 +328,13 @@ abstract class CodeGenerator(
     // initial type check
     if (returnType.getArity != fieldExprs.length) {
       throw new CodeGenException(
-        s"Arity[${returnType.getArity}] of result type[$returnType] does not 
match " +
-        s"number[${fieldExprs.length}] of expressions[$fieldExprs].")
+        s"Arity [${returnType.getArity}] of result type [$returnType] does not 
match " +
+        s"number [${fieldExprs.length}] of expressions [$fieldExprs].")
     }
     if (resultFieldNames.length != fieldExprs.length) {
       throw new CodeGenException(
-        s"Arity[${resultFieldNames.length}] of result field 
names[$resultFieldNames] does not " +
-        s"match number[${fieldExprs.length}] of expressions[$fieldExprs].")
+        s"Arity [${resultFieldNames.length}] of result field names 
[$resultFieldNames] does not " +
+        s"match number [${fieldExprs.length}] of expressions [$fieldExprs].")
     }
     // type check
     returnType match {
@@ -342,8 +342,8 @@ abstract class CodeGenerator(
         fieldExprs.zipWithIndex foreach {
           case (fieldExpr, i) if fieldExpr.resultType != 
pt.getTypeAt(resultFieldNames(i)) =>
             throw new CodeGenException(
-              s"Incompatible types of expression and result type. 
Expression[$fieldExpr] type is " +
-              s"[${fieldExpr.resultType}], result type is 
[${pt.getTypeAt(resultFieldNames(i))}]")
+              s"Incompatible types of expression and result type. Expression 
[$fieldExpr] type is" +
+              s" [${fieldExpr.resultType}], result type is 
[${pt.getTypeAt(resultFieldNames(i))}]")
 
           case _ => // ok
         }
@@ -359,7 +359,7 @@ abstract class CodeGenerator(
 
       case at: AtomicType[_] if at != fieldExprs.head.resultType =>
         throw new CodeGenException(
-          s"Incompatible types of expression and result type. 
Expression[${fieldExprs.head}] " +
+          s"Incompatible types of expression and result type. Expression 
[${fieldExprs.head}] " +
           s"type is [${fieldExprs.head.resultType}], result type is [$at]")
 
       case _ => // ok
@@ -1303,11 +1303,10 @@ abstract class CodeGenerator(
 
   private[flink] def generateProctimeTimestamp(): GeneratedExpression = {
     val resultTerm = newName("result")
-    val resultTypeTerm = 
primitiveTypeTermForTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
 
     val resultCode =
       s"""
-        |$resultTypeTerm $resultTerm = 
$contextTerm.timerService().currentProcessingTime();
+        |long $resultTerm = 
$contextTerm.timerService().currentProcessingTime();
         |""".stripMargin
     GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 01e9dff..7de7aca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
 import 
org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
-import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCoercion}
+import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, 
TimeIntervalTypeInfo, TypeCoercion}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
 
 object ScalarOperators {
@@ -543,6 +543,11 @@ object ScalarOperators {
       operand: GeneratedExpression,
       targetType: TypeInformation[_])
     : GeneratedExpression = (operand.resultType, targetType) match {
+
+    // special case: cast from TimeIndicatorTypeInfo to SqlTimeTypeInfo
+    case (ti: TimeIndicatorTypeInfo, SqlTimeTypeInfo.TIMESTAMP) =>
+      operand.copy(resultType = SqlTimeTypeInfo.TIMESTAMP) // just replace the 
TypeInformation
+
     // identity casting
     case (fromTp, toTp) if fromTp == toTp =>
       operand

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
deleted file mode 100644
index 4fb0378..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions
-
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.validate.SqlMonotonicity
-
-/**
-  * Function that materializes a processing time attribute.
-  * After materialization the result can be used in regular arithmetical 
calculations.
-  */
-object ProctimeSqlFunction
-  extends SqlFunction(
-    "PROCTIME",
-    SqlKind.OTHER_FUNCTION,
-    ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
-    InferTypes.RETURN_TYPE,
-    OperandTypes.family(SqlTypeFamily.TIMESTAMP),
-    SqlFunctionCategory.SYSTEM) {
-
-  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
-
-  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
-    SqlMonotonicity.INCREASING
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala
new file mode 100644
index 0000000..f30ad2f
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/sql/ProctimeSqlFunction.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions.sql
+
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`._
+import org.apache.calcite.sql.validate.SqlMonotonicity
+
+/**
+  * Function that materializes a processing time attribute.
+  * After materialization the result can be used in regular arithmetical 
calculations.
+  */
+object ProctimeSqlFunction
+  extends SqlFunction(
+    "PROCTIME",
+    SqlKind.OTHER_FUNCTION,
+    ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
+    InferTypes.RETURN_TYPE,
+    OperandTypes.family(SqlTypeFamily.TIMESTAMP),
+    SqlFunctionCategory.SYSTEM) {
+
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+    SqlMonotonicity.INCREASING
+
+  override def isDeterministic: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 47469d1..b44c28e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -175,11 +175,11 @@ object UserDefinedFunctionUtils {
         }
       }) {
       throw new ValidationException(
-        s"Scala-style variable arguments in '${methodName}' methods are not 
supported. Please " +
+        s"Scala-style variable arguments in '$methodName' methods are not 
supported. Please " +
           s"add a @scala.annotation.varargs annotation.")
     } else if (found.length > 1) {
       throw new ValidationException(
-        s"Found multiple '${methodName}' methods which match the signature.")
+        s"Found multiple '$methodName' methods which match the signature.")
     }
     found.headOption
   }
@@ -218,7 +218,7 @@ object UserDefinedFunctionUtils {
     if (methods.isEmpty) {
       throw new ValidationException(
         s"Function class '${function.getClass.getCanonicalName}' does not 
implement at least " +
-          s"one method named '${methodName}' which is public, not abstract and 
" +
+          s"one method named '$methodName' which is public, not abstract and " 
+
           s"(in case of table functions) not static.")
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index ac63be1..0cf86f7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -38,7 +38,7 @@ import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.schema.RowSchema
 import 
org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
-import org.apache.flink.table.runtime.TimestampSetterProcessFunction
+import org.apache.flink.table.runtime.RowtimeProcessFunction
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -149,7 +149,7 @@ class DataStreamGroupWindowAggregate(
 
       inputDS
         .process(
-          new TimestampSetterProcessFunction(timeIdx, 
CRowTypeInfo(inputSchema.typeInfo)))
+          new RowtimeProcessFunction(timeIdx, 
CRowTypeInfo(inputSchema.typeInfo)))
         .setParallelism(inputDS.getParallelism)
         .name(s"time attribute: ($timeAttribute)")
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
deleted file mode 100644
index 109c6e1..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-
-/**
-  * MapRunner with [[CRow]] input.
-  */
-class CRowInputMapRunner[OUT](
-    name: String,
-    code: String,
-    @transient var returnType: TypeInformation[OUT])
-  extends RichMapFunction[CRow, OUT]
-  with ResultTypeQueryable[OUT]
-  with Compiler[MapFunction[Row, OUT]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: MapFunction[Row, OUT] = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating MapFunction.")
-    function = clazz.newInstance()
-  }
-
-  override def map(in: CRow): OUT = {
-    function.map(in.row)
-  }
-
-  override def getProducedType: TypeInformation[OUT] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
deleted file mode 100644
index 6b3aa44..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import java.lang.{Boolean => JBool}
-import java.sql.Timestamp
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.types.Row
-import org.slf4j.LoggerFactory
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-
-/**
-  * Convert [[CRow]] to a [[JTuple2]]
-  */
-class CRowInputJavaTupleOutputMapRunner(
-    name: String,
-    code: String,
-    @transient var returnType: TypeInformation[JTuple2[JBool, Any]])
-  extends RichMapFunction[CRow, Any]
-          with ResultTypeQueryable[JTuple2[JBool, Any]]
-          with Compiler[MapFunction[Row, Any]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: MapFunction[Row, Any] = _
-  private var tupleWrapper: JTuple2[JBool, Any] = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating MapFunction.")
-    function = clazz.newInstance()
-    tupleWrapper = new JTuple2[JBool, Any]()
-  }
-
-  override def map(in: CRow): JTuple2[JBool, Any] = {
-    tupleWrapper.f0 = in.change
-    tupleWrapper.f1 = function.map(in.row)
-    tupleWrapper
-  }
-
-  override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = 
returnType
-}
-
-/**
-  * Convert [[CRow]] to a [[Tuple2]]
-  */
-class CRowInputScalaTupleOutputMapRunner(
-  name: String,
-  code: String,
-  @transient var returnType: TypeInformation[(Boolean, Any)])
-  extends RichMapFunction[CRow, (Boolean, Any)]
-    with ResultTypeQueryable[(Boolean, Any)]
-    with Compiler[MapFunction[Row, Any]] {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  private var function: MapFunction[Row, Any] = _
-
-  override def open(parameters: Configuration): Unit = {
-    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
-    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
-    LOG.debug("Instantiating MapFunction.")
-    function = clazz.newInstance()
-  }
-
-  override def map(in: CRow): (Boolean, Any) =
-    (in.change, function.map(in.row))
-
-  override def getProducedType: TypeInformation[(Boolean, Any)] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala
new file mode 100644
index 0000000..9ed9188
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+  * MapRunner with [[CRow]] input.
+  */
+class CRowMapRunner[OUT](
+    name: String,
+    code: String,
+    @transient var returnType: TypeInformation[OUT])
+  extends RichMapFunction[CRow, OUT]
+  with ResultTypeQueryable[OUT]
+  with Compiler[MapFunction[Row, OUT]] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  private var function: MapFunction[Row, OUT] = _
+
+  override def open(parameters: Configuration): Unit = {
+    LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+    val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+    LOG.debug("Instantiating MapFunction.")
+    function = clazz.newInstance()
+  }
+
+  override def map(in: CRow): OUT = {
+    function.map(in.row)
+  }
+
+  override def getProducedType: TypeInformation[OUT] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
new file mode 100644
index 0000000..3eaeea3
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
+  */
+class OutputRowtimeProcessFunction[OUT](
+    function: MapFunction[CRow, OUT],
+    rowtimeIdx: Int)
+  extends ProcessFunction[CRow, OUT] {
+
+  override def open(parameters: Configuration): Unit = {
+    FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+    FunctionUtils.openFunction(function, parameters)
+  }
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, OUT]#Context,
+      out: Collector[OUT]): Unit = {
+
+    val timestamp = in.row.getField(rowtimeIdx).asInstanceOf[Long]
+    out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp)
+
+    val convertedTimestamp = SqlFunctions.internalToTimestamp(timestamp)
+    in.row.setField(rowtimeIdx, convertedTimestamp)
+
+    out.collect(function.map(in))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala
new file mode 100644
index 0000000..e192b07
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/RowtimeProcessFunction.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.util.Collector
+
+/**
+  * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] 
field into the
+  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]].
+  */
+class RowtimeProcessFunction(
+    val rowtimeIdx: Int,
+    @transient var returnType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
+  with ResultTypeQueryable[CRow] {
+
+  override def processElement(
+      in: CRow,
+      ctx: ProcessFunction[CRow, CRow]#Context,
+      out: Collector[CRow]): Unit = {
+
+    val timestamp = in.row.getField(rowtimeIdx).asInstanceOf[Long]
+    
out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp)
+    out.collect(in)
+  }
+
+  override def getProducedType: TypeInformation[CRow] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
deleted file mode 100644
index 00961f0..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/TimestampSetterProcessFunction.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import java.sql.Timestamp
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.util.Collector
-
-/**
-  * ProcessFunction to copy a timestamp from a [[org.apache.flink.types.Row]] 
field into the
-  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]].
-  */
-class TimestampSetterProcessFunction(
-    val rowtimeIdx: Int,
-    @transient var returnType: TypeInformation[CRow])
-  extends ProcessFunction[CRow, CRow]
-  with ResultTypeQueryable[CRow] {
-
-  override def processElement(
-      in: CRow,
-      ctx: ProcessFunction[CRow, CRow]#Context,
-      out: Collector[CRow]): Unit = {
-
-    val timestamp = 
SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp])
-    
out.asInstanceOf[TimestampedCollector[CRow]].setAbsoluteTimestamp(timestamp)
-    out.collect(in)
-  }
-
-  override def getProducedType: TypeInformation[CRow] = returnType
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
deleted file mode 100644
index 8f12c30..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/WrappingTimestampSetterProcessFunction.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime
-
-import java.sql.Timestamp
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.TimestampedCollector
-import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.util.Collector
-
-/**
-  * Wraps a ProcessFunction and sets a Timestamp field of a CRow as
-  * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp.
-  */
-class WrappingTimestampSetterProcessFunction[OUT](
-    function: MapFunction[CRow, OUT],
-    rowtimeIdx: Int)
-  extends ProcessFunction[CRow, OUT] {
-
-  override def open(parameters: Configuration): Unit = {
-    super.open(parameters)
-    function match {
-      case f: RichMapFunction[_, _] =>
-        f.setRuntimeContext(getRuntimeContext)
-        f.open(parameters)
-      case _ =>
-    }
-  }
-
-  override def processElement(
-      in: CRow,
-      ctx: ProcessFunction[CRow, OUT]#Context,
-      out: Collector[OUT]): Unit = {
-
-    val timestamp = 
SqlFunctions.toLong(in.row.getField(rowtimeIdx).asInstanceOf[Timestamp])
-    out.asInstanceOf[TimestampedCollector[_]].setAbsoluteTimestamp(timestamp)
-
-    out.collect(function.map(in))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 52105e3..6304dc4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -96,7 +96,6 @@ object AggregateUtil {
 
     val genFunction = generator.generateAggregations(
       "UnboundedProcessingOverAggregateHelper",
-      generator,
       inputFieldTypeInfo,
       aggregates,
       aggFields,
@@ -175,7 +174,6 @@ object AggregateUtil {
 
     val genFunction = generator.generateAggregations(
       "NonWindowedAggregationHelper",
-      generator,
       inputFieldTypes,
       aggregates,
       aggFields,
@@ -240,7 +238,6 @@ object AggregateUtil {
 
     val genFunction = generator.generateAggregations(
       "BoundedOverAggregateHelper",
-      generator,
       inputFieldTypeInfo,
       aggregates,
       aggFields,
@@ -372,7 +369,6 @@ object AggregateUtil {
 
     val genFunction = generator.generateAggregations(
       "DataSetAggregatePrepareMapHelper",
-      generator,
       inputFieldTypeInfo,
       aggregates,
       aggFieldIndexes,
@@ -451,7 +447,6 @@ object AggregateUtil {
         // sliding time-window for partial aggregations
         val genFunction = generator.generateAggregations(
           "DataSetAggregatePrepareMapHelper",
-          generator,
           physicalInputTypes,
           aggregates,
           aggFieldIndexes,
@@ -555,7 +550,6 @@ object AggregateUtil {
 
     val genPreAggFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
-      generator,
       physicalInputTypes,
       aggregates,
       aggFieldIndexes,
@@ -572,7 +566,6 @@ object AggregateUtil {
 
     val genFinalAggFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
-      generator,
       physicalInputTypes,
       aggregates,
       aggFieldIndexes,
@@ -714,7 +707,6 @@ object AggregateUtil {
 
         val genFunction = generator.generateAggregations(
           "GroupingWindowAggregateHelper",
-          generator,
           physicalInputTypes,
           aggregates,
           aggFieldIndexes,
@@ -789,7 +781,6 @@ object AggregateUtil {
 
         val genFunction = generator.generateAggregations(
           "GroupingWindowAggregateHelper",
-          generator,
           physicalInputTypes,
           aggregates,
           aggFieldIndexes,
@@ -870,7 +861,6 @@ object AggregateUtil {
 
       val genPreAggFunction = generator.generateAggregations(
         "DataSetAggregatePrepareMapHelper",
-        generator,
         inputFieldTypeInfo,
         aggregates,
         aggInFields,
@@ -897,7 +887,6 @@ object AggregateUtil {
 
       val genFinalAggFunction = generator.generateAggregations(
         "DataSetAggregateFinalHelper",
-        generator,
         inputFieldTypeInfo,
         aggregates,
         aggInFields,
@@ -921,7 +910,6 @@ object AggregateUtil {
     else {
       val genFunction = generator.generateAggregations(
         "DataSetAggregateHelper",
-        generator,
         inputFieldTypeInfo,
         aggregates,
         aggInFields,
@@ -1019,7 +1007,6 @@ object AggregateUtil {
 
     val genFunction = generator.generateAggregations(
       "GroupingWindowAggregateHelper",
-      generator,
       inputFieldTypeInfo,
       aggregates,
       aggFields,
@@ -1214,7 +1201,7 @@ object AggregateUtil {
               case DECIMAL =>
                 new DecimalSumWithRetractAggFunction
               case sqlType: SqlTypeName =>
-                throw new TableException(s"Sum aggregate does no support type: 
'${sqlType}'")
+                throw new TableException(s"Sum aggregate does no support type: 
'$sqlType'")
             }
           } else {
             aggregates(index) = sqlTypeName match {
@@ -1233,7 +1220,7 @@ object AggregateUtil {
               case DECIMAL =>
                 new DecimalSumAggFunction
               case sqlType: SqlTypeName =>
-                throw new TableException(s"Sum aggregate does no support type: 
'${sqlType}'")
+                throw new TableException(s"Sum aggregate does no support type: 
'$sqlType'")
             }
           }
 
@@ -1255,7 +1242,7 @@ object AggregateUtil {
               case DECIMAL =>
                 new DecimalSum0WithRetractAggFunction
               case sqlType: SqlTypeName =>
-                throw new TableException(s"Sum0 aggregate does no support 
type: '${sqlType}'")
+                throw new TableException(s"Sum0 aggregate does no support 
type: '$sqlType'")
             }
           } else {
             aggregates(index) = sqlTypeName match {
@@ -1274,7 +1261,7 @@ object AggregateUtil {
               case DECIMAL =>
                 new DecimalSum0AggFunction
               case sqlType: SqlTypeName =>
-                throw new TableException(s"Sum0 aggregate does no support 
type: '${sqlType}'")
+                throw new TableException(s"Sum0 aggregate does no support 
type: '$sqlType'")
             }
           }
 
@@ -1295,7 +1282,7 @@ object AggregateUtil {
             case DECIMAL =>
               new DecimalAvgAggFunction
             case sqlType: SqlTypeName =>
-              throw new TableException(s"Avg aggregate does no support type: 
'${sqlType}'")
+              throw new TableException(s"Avg aggregate does no support type: 
'$sqlType'")
           }
 
         case sqlMinMaxFunction: SqlMinMaxAggFunction =>
@@ -1322,7 +1309,7 @@ object AggregateUtil {
                   new StringMinWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
-                    s"Min with retract aggregate does no support type: 
'${sqlType}'")
+                    s"Min with retract aggregate does no support type: 
'$sqlType'")
               }
             } else {
               sqlTypeName match {
@@ -1345,7 +1332,7 @@ object AggregateUtil {
                 case VARCHAR | CHAR =>
                   new StringMinAggFunction
                 case sqlType: SqlTypeName =>
-                  throw new TableException(s"Min aggregate does no support 
type: '${sqlType}'")
+                  throw new TableException(s"Min aggregate does no support 
type: '$sqlType'")
               }
             }
           } else {
@@ -1371,7 +1358,7 @@ object AggregateUtil {
                   new StringMaxWithRetractAggFunction
                 case sqlType: SqlTypeName =>
                   throw new TableException(
-                    s"Max with retract aggregate does no support type: 
'${sqlType}'")
+                    s"Max with retract aggregate does no support type: 
'$sqlType'")
               }
             } else {
               sqlTypeName match {
@@ -1394,7 +1381,7 @@ object AggregateUtil {
                 case VARCHAR | CHAR =>
                   new StringMaxAggFunction
                 case sqlType: SqlTypeName =>
-                  throw new TableException(s"Max aggregate does no support 
type: '${sqlType}'")
+                  throw new TableException(s"Max aggregate does no support 
type: '$sqlType'")
               }
             }
           }
@@ -1463,7 +1450,7 @@ object AggregateUtil {
           relDataType.head.getIndex
         } else {
           throw TableException(
-            s"Encountered more than one time attribute with the same name: 
'${relDataType}'")
+            s"Encountered more than one time attribute with the same name: 
'$relDataType'")
         }
       case e => throw TableException(
         "The time attribute of window in batch environment should be " +

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index ceb986d..8a0d682 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -17,10 +17,8 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.sql.Timestamp
 import java.util.{ArrayList => JArrayList, List => JList}
 
-import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
@@ -118,7 +116,7 @@ class RowTimeBoundedRangeOver(
     registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime())
 
     // triggering timestamp for trigger calculation
-    val triggeringTs = 
SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
+    val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]
 
     val lastTriggeringTs = lastTriggeringTsState.value
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 678a3b7..ba65846 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -17,11 +17,9 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.sql.Timestamp
 import java.util
 import java.util.{List => JList}
 
-import org.apache.calcite.runtime.SqlFunctions
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
@@ -29,11 +27,11 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
-import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.slf4j.LoggerFactory
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
  * Process Function for ROWS clause event-time bounded OVER window
@@ -56,6 +54,8 @@ class RowTimeBoundedRowsOver(
   Preconditions.checkNotNull(aggregationStateType)
   Preconditions.checkNotNull(precedingOffset)
 
+  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
   private var output: CRow = _
 
   // the state which keeps the last triggering timestamp
@@ -73,7 +73,6 @@ class RowTimeBoundedRowsOver(
   // to this time stamp.
   private var dataState: MapState[Long, JList[Row]] = _
 
-  val LOG = LoggerFactory.getLogger(this.getClass)
   private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
@@ -127,7 +126,7 @@ class RowTimeBoundedRowsOver(
     registerProcessingCleanupTimer(ctx, 
ctx.timerService().currentProcessingTime())
 
     // triggering timestamp for trigger calculation
-    val triggeringTs = 
SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp])
+    val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]
 
     val lastTriggeringTs = lastTriggeringTsState.value
     // check if the data is expired, if not, save the data and register event 
time timer

http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
index fd58678..0d69355 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -17,24 +17,17 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.sql.Timestamp
+import java.util.{Collections, ArrayList => JArrayList, List => JList}
 
-import org.apache.flink.api.common.state.ValueState
-import org.apache.flink.api.common.state.ValueStateDescriptor
-import org.apache.flink.api.common.state.MapState
-import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, 
ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
-import java.util.Collections
-import java.util.{ArrayList => JArrayList, List => JList}
-
-import org.apache.calcite.runtime.SqlFunctions
-import org.apache.flink.streaming.api.operators.TimestampedCollector
 
 /**
  * ProcessFunction to sort on event-time and possibly addtional secondary sort 
attributes.
@@ -90,7 +83,7 @@ class RowTimeSortProcessFunction(
     val input = inputC.row
     
     // timestamp of the processed row
-    val rowtime = 
SqlFunctions.toLong(input.getField(rowtimeIdx).asInstanceOf[Timestamp])
+    val rowtime = input.getField(rowtimeIdx).asInstanceOf[Long]
 
     val lastTriggeringTs = lastTriggeringTsState.value
 

Reply via email to