http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 8cad64f..ba044be 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -17,14 +17,15 @@
  */
 package org.apache.flink.table.runtime.harness
 
-import java.lang.{Integer => JInt, Long => JLong}
+import java.lang.{Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS}
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.harness.HarnessTestBase._
 import org.apache.flink.table.runtime.types.CRow
@@ -48,8 +49,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
         queryConfig))
 
     val testHarness =
-      createHarnessTester(processFunction,new 
TupleRowKeySelector[Integer](0),BasicTypeInfo
-        .INT_TYPE_INFO)
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](1),
+        Types.STRING)
 
     testHarness.open()
 
@@ -57,91 +60,77 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 1L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 2L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 3L: JLong), true)))
 
     // register cleanup timer with 4100
     testHarness.setProcessingTime(1100)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 20L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 4L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 30L: JLong), true)))
 
     // register cleanup timer with 6001
     testHarness.setProcessingTime(3001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 7L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 8L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 9L: JLong), true)))
 
     // trigger cleanup timer and register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
     testHarness.processElement(new StreamRecord(
-        CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
2))
+        CRow(Row.of(toTS(2), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "bbb", 40L: JLong), true)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 
10L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 
20L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: 
JLong), true), 1))
+      CRow(Row.of(toTS(1), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 
30L: JLong), true), 1))
+      CRow(Row.of(toTS(1), "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: 
JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: 
JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: 
JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 10L: JLong, 
10L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 
40L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }
@@ -163,59 +152,59 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[Integer](0),
-        BasicTypeInfo.INT_TYPE_INFO)
+        new TupleRowKeySelector[String](1),
+        Types.STRING)
 
     testHarness.open()
 
     // register cleanup timer with 3003
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 1L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 10L: JLong), true)))
 
     testHarness.setProcessingTime(4)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 2L: JLong), true)))
 
     // trigger cleanup timer and register cleanup timer with 6003
     testHarness.setProcessingTime(3003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 3L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 20L: JLong), true)))
 
     testHarness.setProcessingTime(5)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 4L: JLong), true)))
 
     // register cleanup timer with 9002
     testHarness.setProcessingTime(6002)
 
     testHarness.setProcessingTime(7002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 30L: JLong), true)))
 
     // register cleanup timer with 14002
     testHarness.setProcessingTime(11002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 7L: JLong), true)))
 
     testHarness.setProcessingTime(11004)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 8L: JLong), true)))
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 40L: JLong), true)))
 
     testHarness.setProcessingTime(11006)
 
@@ -225,49 +214,35 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same proc timestamp have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 4))
+      CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 
10L: JLong), true), 4))
+      CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 5))
+      CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 3L: JLong, 4L: 
JLong), true), 3004))
+      CRow(Row.of(toTS(0), "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(
-        2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 20L: JLong, 20L: 
JLong), true), 3004))
+      CRow(Row.of(toTS(0), "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 4L: JLong, 4L: 
JLong), true), 6))
+      CRow(Row.of(toTS(0), "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 5L: JLong, 6L: 
JLong), true), 7003))
+      CRow(Row.of(toTS(0), "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: 
JLong), true), 7003))
+      CRow(Row.of(toTS(0), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 30L: JLong, 30L: 
JLong), true), 7003))
+      CRow(Row.of(toTS(0), "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: 
JLong), true), 11003))
+      CRow(Row.of(toTS(0), "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(
-        1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
+      CRow(Row.of(toTS(0), "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(Row.of(
-        1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
+      CRow(Row.of(toTS(0), "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
+      CRow(Row.of(toTS(0), "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 11005))
+      CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
 
     testHarness.close()
   }
@@ -284,8 +259,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[Integer](0),
-        BasicTypeInfo.INT_TYPE_INFO)
+        new TupleRowKeySelector[String](1),
+        Types.STRING)
 
     testHarness.open()
 
@@ -293,85 +268,71 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1003)
 
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 1L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 2L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 3L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 20L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 4L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 30L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 7L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 8L: JLong), true)))
 
     // trigger cleanup timer and register cleanup timer with 8003
     testHarness.setProcessingTime(5003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
5003))
+      CRow(Row.of(toTS(0), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
5003))
+      CRow(Row.of(toTS(0), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
5003))
+      CRow(Row.of(toTS(0), "bbb", 40L: JLong), true)))
 
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 
10L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 
20L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 
30L: JLong), true), 0))
+      CRow(Row.of(toTS(0), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: 
JLong), true), 0))
+      CRow(Row.of(toTS(0), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 9L: JLong, 9L: 
JLong), true), 5003))
+      CRow(Row.of(toTS(0), "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: 
JLong), true), 5003))
+      CRow(Row.of(toTS(0), "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 5003))
+      CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
   }
 
@@ -387,63 +348,64 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxAggregationStateType,
         minMaxCRowType,
         4000,
+        0,
         new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[String](3),
+        new TupleRowKeySelector[String](1),
         BasicTypeInfo.STRING_TYPE_INFO)
 
     testHarness.open()
 
     testHarness.processWatermark(1)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 1L: JLong), true)))
 
     testHarness.processWatermark(2)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3))
+      CRow(Row.of(toTS(3), "bbb", 10L: JLong), true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
 
     testHarness.processWatermark(4001)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 
4002))
+      CRow(Row.of(toTS(4002), "aaa", 3L: JLong), true)))
 
     testHarness.processWatermark(4002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003))
+      CRow(Row.of(toTS(4003), "aaa", 4L: JLong), true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 
4801))
+      CRow(Row.of(toTS(4801), "bbb", 25L: JLong), true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
 
     testHarness.processWatermark(19000)
 
@@ -453,21 +415,22 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 3000
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 4500
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(4499)
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(4500)
+    val x = testHarness.numKeyedStateEntries()
     assert(testHarness.numKeyedStateEntries() == 0) // check that all state is 
gone
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 
20011)) // clean-up 6500
+      CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 
8500
@@ -487,59 +450,42 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 2))
+      CRow(Row.of(toTS(2), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 
10L: JLong), true), 3))
+      CRow(Row.of(toTS(3), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 4002))
+      CRow(Row.of(toTS(4002), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: 
JLong), true), 4003))
+      CRow(Row.of(toTS(4003), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: 
JLong), true), 4801))
+      CRow(Row.of(toTS(4801), "bbb", 25L: JLong, 25L: JLong, 25L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: 
JLong), true), 7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: 
JLong), true), 8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 25L: JLong, 30L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 8L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), 
true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: 
JLong), true), 20011))
+      CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
   }
 
@@ -552,59 +498,60 @@ class OverWindowHarnessTest extends HarnessTestBase{
         minMaxAggregationStateType,
         minMaxCRowType,
         3,
+        0,
         new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[String](3),
+        new TupleRowKeySelector[String](1),
         BasicTypeInfo.STRING_TYPE_INFO)
 
     testHarness.open()
 
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 
2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 
4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
 
     testHarness.processWatermark(19000)
 
@@ -614,10 +561,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is removed after max retention time
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 3000
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 4500
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500
     testHarness.processWatermark(20010) // compute output
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
@@ -628,7 +575,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // check that state is only removed if all data was processed
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 
20011)) // clean-up 6500
+      CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500
 
     assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
     testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 
8500
@@ -648,59 +595,42 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: 
JLong), true), 2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: 
JLong), true), 4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: 
JLong), true), 7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: 
JLong), true), 8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 20L: JLong, 40L: JLong), 
true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: 
JLong), true), 20011))
+      CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
   }
 
@@ -715,12 +645,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
+        0,
         new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[String](3),
+        new TupleRowKeySelector[String](1),
         BasicTypeInfo.STRING_TYPE_INFO)
 
     testHarness.open()
@@ -728,47 +659,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 
2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 
4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
 
     testHarness.processWatermark(19000)
 
@@ -781,10 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data 
left. New timer 7000
@@ -802,56 +733,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     // all elements at the same row-time have the same value per key
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: 
JLong), true), 2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: 
JLong), true), 4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: 
JLong), true), 8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), 
true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
   }
 
@@ -863,12 +778,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
+        0,
         new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
         processFunction,
-        new TupleRowKeySelector[String](3),
+        new TupleRowKeySelector[String](1),
         BasicTypeInfo.STRING_TYPE_INFO)
 
     testHarness.open()
@@ -876,47 +792,47 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong), true)))
 
     testHarness.processWatermark(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 
2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true)))
 
     testHarness.processWatermark(4000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 
4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true)))
 
     testHarness.processWatermark(4800)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 
4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true)))
 
     testHarness.processWatermark(6500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 
6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true)))
 
     testHarness.processWatermark(7000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true)))
 
     testHarness.processWatermark(8000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true)))
 
     testHarness.processWatermark(12000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true)))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true)))
 
     testHarness.processWatermark(19000)
 
@@ -929,10 +845,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000
 
     assert(testHarness.numKeyedStateEntries() > 0)
     testHarness.setProcessingTime(5000) // does not clean up, because data 
left. New timer 7000
@@ -949,56 +865,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 801))
+      CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: 
JLong), true), 2501))
+      CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: 
JLong), true), 4001))
+      CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: 
JLong), true), 4801))
+      CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: 
JLong), true), 6501))
+      CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 7001))
+      CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: 
JLong), true), 8001))
+      CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), 
true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: 
JLong), true), 12001))
+      CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), 
true)))
 
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+      CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true)))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+      CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true)))
 
-    verify(expectedOutput, result, new RowResultSortComparator(6))
+    verify(expectedOutput, result, new RowResultSortComparator())
     testHarness.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 0451534..18ba6bb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.harness
 import java.lang.{Integer => JInt, Long => JLong}
 import java.util.concurrent.ConcurrentLinkedQueue
 
+import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
@@ -35,6 +36,7 @@ import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness,
 import org.apache.flink.table.runtime.aggregate.{CollectionRowComparator, 
ProcTimeSortProcessFunction, RowTimeSortProcessFunction}
 import 
org.apache.flink.table.runtime.harness.SortProcessFunctionHarnessTest.TupleRowSelector
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.types.Row
 import org.junit.Test
 
@@ -75,7 +77,7 @@ class SortProcessFunctionHarnessTest {
         inputCRowType,
         collectionRowComparator))
   
-   val testHarness = new 
KeyedOneInputStreamOperatorTestHarness[Integer,CRow,CRow](
+   val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, 
CRow](
       processFunction, 
       new TupleRowSelector(0), 
       BasicTypeInfo.INT_TYPE_INFO)
@@ -86,77 +88,77 @@ class SortProcessFunctionHarnessTest {
 
       // timestamp is ignored in processing time
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2003))
+      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2004))
+      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true)))
 
     //move the timestamp to ensure the execution
     testHarness.setProcessingTime(1005)
-    
+
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
+      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2007))
-    
+      Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong), true)))
+
     testHarness.setProcessingTime(1008)
-    
+
     val result = testHarness.getOutput
-    
+
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
-    
+
     // all elements at the same proc timestamp have the same value
     // elements should be sorted ascending on field 1 and descending on field 2
     // (10,0) (11,1) (12,2) (12,1) (12,0)
     // (1,0) (2,0)
-    
+
      expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true)))
      expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true), 4))
+      Row.of(1: JInt, 12L: JLong, 2: JInt, "aaa", 11L: JLong),true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 4))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true), 4))
-    
+      Row.of(1: JInt, 12L: JLong, 0: JInt, "aaa", 11L: JLong),true)))
+
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+      Row.of(1: JInt, 1L: JLong, 0: JInt, "aaa", 11L: JLong),true)))
     expectedOutput.add(new StreamRecord(new CRow(
-        Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+        Row.of(1: JInt, 2L: JLong, 0: JInt, "aaa", 11L: JLong),true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true), 1006))
+      Row.of(1: JInt, 3L: JLong, 0: JInt, "aaa", 11L: JLong),true)))
 
     TestHarnessUtil.assertOutputEquals("Output was not correctly sorted.", 
expectedOutput, result)
-    
+
     testHarness.close()
   }
-  
+
   @Test
   def testSortRowTimeHarnessPartitioned(): Unit = {
-    
+
     val rT =  new RowTypeInfo(Array[TypeInformation[_]](
       INT_TYPE_INFO,
       LONG_TYPE_INFO,
       INT_TYPE_INFO,
       STRING_TYPE_INFO,
-      LONG_TYPE_INFO),
+      TimeIndicatorTypeInfo.ROWTIME_INDICATOR),
       Array("a", "b", "c", "d", "e"))
 
     val indexes = Array(1, 2)
-      
+
     val fieldComps = Array[TypeComparator[AnyRef]](
       LONG_TYPE_INFO.createComparator(true, 
null).asInstanceOf[TypeComparator[AnyRef]],
       INT_TYPE_INFO.createComparator(false, 
null).asInstanceOf[TypeComparator[AnyRef]] )
-    val booleanOrders = Array(true, false)    
+    val booleanOrders = Array(true, false)
 
     val rowComp = new RowComparator(
       rT.getTotalFields,
@@ -164,21 +166,22 @@ class SortProcessFunctionHarnessTest {
       fieldComps,
       new Array[TypeSerializer[AnyRef]](0), //used only for serialized 
comparisons
       booleanOrders)
-    
+
     val collectionRowComparator = new CollectionRowComparator(rowComp)
-    
+
     val inputCRowType = CRowTypeInfo(rT)
-    
+
     val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
       new RowTimeSortProcessFunction(
         inputCRowType,
+        4,
         Some(collectionRowComparator)))
-  
+
    val testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, CRow, 
CRow](
-      processFunction, 
-      new TupleRowSelector(0), 
+      processFunction,
+      new TupleRowSelector(0),
       BasicTypeInfo.INT_TYPE_INFO)
-    
+
    testHarness.open()
 
    testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime)
@@ -186,71 +189,71 @@ class SortProcessFunctionHarnessTest {
 
       // timestamp is ignored in processing time
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong), true), 1001))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2002))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2002))
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2002))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002))
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2004))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2006))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true)))
 
     // move watermark forward
     testHarness.processWatermark(2007)
 
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2002)) 
// too late
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) // too 
late
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong), true), 2019)) 
// too early
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2019)), true))) // too 
early
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true)))
     testHarness.processElement(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true)))
 
     // move watermark forward
     testHarness.processWatermark(2012)
 
     val result = testHarness.getOutput
-    
+
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
-    
+
     // all elements at the same proc timestamp have the same value
     // elements should be sorted ascending on field 1 and descending on field 2
     // (10,0) (11,1) (12,2) (12,1) (12,0)
     expectedOutput.add(new Watermark(3))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 11L: JLong),true), 1001))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2002))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 11L: JLong),true), 2002))
+      Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 11L: JLong),true), 2002))
+      Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2002))
+      Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 11L: JLong),true), 2004))
+      Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong),true), 2006))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true)))
     expectedOutput.add(new Watermark(2007))
 
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 11L: JLong), true), 2008))
+      Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true)))
     expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 11L: JLong), true), 2010))
+      Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true)))
 
     expectedOutput.add(new Watermark(2012))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93d0ae4a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 4121754..82ed81c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.stream.table
 
 import java.io.File
 import java.lang.{Boolean => JBool}
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,19 +29,22 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
 import org.junit.Assert._
 import org.junit.Test
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
@@ -199,8 +203,6 @@ class TableSinkITCase extends 
StreamingMultipleProgramsTestBase {
 
   }
 
-
-
   @Test
   def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -349,6 +351,136 @@ class TableSinkITCase extends 
StreamingMultipleProgramsTestBase {
     assertEquals(expected, retracted)
   }
 
+  @Test
+  def testToAppendStreamRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime.cast(Types.LONG))
+
+    r.toAppendStream[Row]
+      .process(new ProcessFunction[Row, Row] {
+        override def processElement(
+          row: Row,
+          ctx: ProcessFunction[Row, Row]#Context,
+          out: Collector[Row]): Unit = {
+
+          val rowTS: Long = row.getField(2).asInstanceOf[Long]
+          if (ctx.timestamp() == rowTS) {
+            out.collect(row)
+          }
+        }
+      }).addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = List(
+      "1,1970-01-01 00:00:00.004,4",
+      "2,1970-01-01 00:00:00.004,4",
+      "3,1970-01-01 00:00:00.004,4",
+      "3,1970-01-01 00:00:00.009,9",
+      "4,1970-01-01 00:00:00.009,9",
+      "4,1970-01-01 00:00:00.014,14",
+      "5,1970-01-01 00:00:00.014,14",
+      "5,1970-01-01 00:00:00.019,19",
+      "6,1970-01-01 00:00:00.019,19",
+      "6,1970-01-01 00:00:00.024,24")
+
+    assertEquals(expected, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testToRetractStreamRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime.cast(Types.LONG))
+
+    r.toRetractStream[Row]
+      .process(new ProcessFunction[(Boolean, Row), Row] {
+        override def processElement(
+          row: (Boolean, Row),
+          ctx: ProcessFunction[(Boolean, Row), Row]#Context,
+          out: Collector[Row]): Unit = {
+
+          val rowTS: Long = row._2.getField(2).asInstanceOf[Long]
+          if (ctx.timestamp() == rowTS) {
+            out.collect(row._2)
+          }
+        }
+      }).addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = List(
+      "1,1970-01-01 00:00:00.004,4",
+      "2,1970-01-01 00:00:00.004,4",
+      "3,1970-01-01 00:00:00.004,4",
+      "3,1970-01-01 00:00:00.009,9",
+      "4,1970-01-01 00:00:00.009,9",
+      "4,1970-01-01 00:00:00.014,14",
+      "5,1970-01-01 00:00:00.014,14",
+      "5,1970-01-01 00:00:00.019,19",
+      "6,1970-01-01 00:00:00.019,19",
+      "6,1970-01-01 00:00:00.024,24")
+
+    assertEquals(expected, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToAppendStreamMultiRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+    r.toAppendStream[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToRetractStreamMultiRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+    r.toRetractStream[Row]
+  }
+
   /** Converts a list of retraction messages into a list of final results. */
   private def restractResults(results: List[JTuple2[JBool, Row]]): 
List[String] = {
 

Reply via email to