http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
index 910cbf2..9da2c44 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -18,11 +18,12 @@
 
 package org.apache.flink.table.api.scala.stream.table
 
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
 import 
org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
@@ -34,6 +35,8 @@ import scala.collection.mutable
   * Tests of groupby (without window) aggregations
   */
 class GroupAggregationsITCase extends StreamingWithStateTestBase {
+  private val queryConfig = new StreamQueryConfig()
+  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 
   @Test
   def testNonKeyedGroupAggregate(): Unit = {
@@ -45,7 +48,7 @@ class GroupAggregationsITCase extends 
StreamingWithStateTestBase {
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
             .select('a.sum, 'b.sum)
 
-    val results = t.toRetractStream[Row]
+    val results = t.toRetractStream[Row](queryConfig)
     results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
@@ -64,7 +67,7 @@ class GroupAggregationsITCase extends 
StreamingWithStateTestBase {
       .groupBy('b)
       .select('b, 'a.sum)
 
-    val results = t.toRetractStream[Row]
+    val results = t.toRetractStream[Row](queryConfig)
     results.addSink(new StreamITCase.RetractingSink)
     env.execute()
 
@@ -85,7 +88,7 @@ class GroupAggregationsITCase extends 
StreamingWithStateTestBase {
       .groupBy('cnt)
       .select('cnt, 'b.count as 'freq)
 
-    val results = t.toRetractStream[Row]
+    val results = t.toRetractStream[Row](queryConfig)
 
     results.addSink(new RetractingSink)
     env.execute()
@@ -104,7 +107,7 @@ class GroupAggregationsITCase extends 
StreamingWithStateTestBase {
       .groupBy('e, 'b % 3)
       .select('c.min, 'e, 'a.avg, 'd.count)
 
-    val results = t.toRetractStream[Row]
+    val results = t.toRetractStream[Row](queryConfig)
     results.addSink(new RetractingSink)
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
deleted file mode 100644
index eadcfc8..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.runtime.aggregate
-
-import java.util.Comparator
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.lang.{Integer => JInt, Long => JLong}
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.operators.KeyedProcessOperator
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
-import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import 
org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, 
LongMinWithRetractAggFunction}
-import 
org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-import org.apache.flink.types.Row
-import org.junit.Test
-
-class BoundedProcessingOverRangeProcessFunctionTest {
-
-  @Test
-  def testProcTimePartitionedOverRange(): Unit = {
-
-    val rT =  new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]](
-      INT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      INT_TYPE_INFO,
-      STRING_TYPE_INFO,
-      LONG_TYPE_INFO),
-      Array("a", "b", "c", "d", "e")))
-
-    val aggregates =
-      Array(new LongMinWithRetractAggFunction,
-            new 
LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
-    val aggregationStateType: RowTypeInfo = 
AggregateUtil.createAccumulatorRowType(aggregates)
-
-    val funcCode =
-      """
-        |public class BoundedOverAggregateHelper$33
-        |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
-        |
-        |  transient 
org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
-        |    fmin = null;
-        |
-        |  transient 
org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
-        |    fmax = null;
-        |
-        |  public BoundedOverAggregateHelper$33() throws Exception {
-        |
-        |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
-        |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-        |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
-        |    
"MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S"
 +
-        |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
-        |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
-        |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
-        |
-        |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
-        |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-        |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
-        |    
"MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-        |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf"
 +
-        |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-        |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
-        |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
-        |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
-        |  }
-        |
-        |  public void setAggregationResults(
-        |    org.apache.flink.types.Row accs,
-        |    org.apache.flink.types.Row output) {
-        |
-        |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
-        |      (org.apache.flink.table.functions.AggregateFunction) fmin;
-        |    output.setField(5, baseClass0.getValue(
-        |      
(org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-        |      accs.getField(0)));
-        |
-        |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
-        |      (org.apache.flink.table.functions.AggregateFunction) fmax;
-        |    output.setField(6, baseClass1.getValue(
-        |      
(org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-        |      accs.getField(1)));
-        |  }
-        |
-        |  public void accumulate(
-        |    org.apache.flink.types.Row accs,
-        |    org.apache.flink.types.Row input) {
-        |
-        |    fmin.accumulate(
-        |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-        |      accs.getField(0)),
-        |      (java.lang.Long) input.getField(4));
-        |
-        |    fmax.accumulate(
-        |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-        |      accs.getField(1)),
-        |      (java.lang.Long) input.getField(4));
-        |  }
-        |
-        |  public void retract(
-        |    org.apache.flink.types.Row accs,
-        |    org.apache.flink.types.Row input) {
-        |
-        |    fmin.retract(
-        |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-        |      accs.getField(0)),
-        |      (java.lang.Long) input.getField(4));
-        |
-        |    fmax.retract(
-        |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-        |      accs.getField(1)),
-        |      (java.lang.Long) input.getField(4));
-        |  }
-        |
-        |  public org.apache.flink.types.Row createAccumulators() {
-        |
-        |    org.apache.flink.types.Row accs = new 
org.apache.flink.types.Row(2);
-        |
-        |    accs.setField(
-        |      0,
-        |      fmin.createAccumulator());
-        |
-        |    accs.setField(
-        |      1,
-        |      fmax.createAccumulator());
-        |
-        |      return accs;
-        |  }
-        |
-        |  public void setForwardedFields(
-        |    org.apache.flink.types.Row input,
-        |    org.apache.flink.types.Row output) {
-        |
-        |    output.setField(0, input.getField(0));
-        |    output.setField(1, input.getField(1));
-        |    output.setField(2, input.getField(2));
-        |    output.setField(3, input.getField(3));
-        |    output.setField(4, input.getField(4));
-        |  }
-        |
-        |  public org.apache.flink.types.Row createOutputRow() {
-        |    return new org.apache.flink.types.Row(7);
-        |  }
-        |
-        |/*******  This test does not use the following methods  *******/
-        |  public org.apache.flink.types.Row mergeAccumulatorsPair(
-        |    org.apache.flink.types.Row a,
-        |    org.apache.flink.types.Row b) {
-        |    return null;
-        |  }
-        |
-        |  public void resetAccumulator(org.apache.flink.types.Row accs) {
-        |  }
-        |
-        |  public void setConstantFlags(org.apache.flink.types.Row output) {
-        |  }
-        |}
-      """.stripMargin
-
-    val funcName = "BoundedOverAggregateHelper$33"
-
-    val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
-    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeBoundedRangeOver(
-        genAggFunction,
-        1000,
-        aggregationStateType,
-        rT))
-
-    val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, 
CRow](
-      processFunction,
-      new TupleRowSelector(0),
-      BasicTypeInfo.INT_TYPE_INFO)
-
-    testHarness.open()
-
-    // Time = 3
-    testHarness.setProcessingTime(3)
-    // key = 1
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 
0))
-    // key = 2
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 
0))
-
-    // Time = 4
-    testHarness.setProcessingTime(4)
-    // key = 1
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 
0))
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 
0))
-    // key = 2
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 
0))
-
-    // Time = 5
-    testHarness.setProcessingTime(5)
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 
0))
-
-    // Time = 6
-    testHarness.setProcessingTime(6)
-
-    // Time = 1002
-    testHarness.setProcessingTime(1002)
-    // key = 1
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 
0))
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 
0))
-    // key = 2
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 
0))
-
-    // Time = 1003
-    testHarness.setProcessingTime(1003)
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
0))
-
-    // Time = 1004
-    testHarness.setProcessingTime(1004)
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
0))
-
-    // Time = 1005
-    testHarness.setProcessingTime(1005)
-    // key = 1
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
0))
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
0))
-    // key = 2
-    testHarness.processElement(new StreamRecord(
-      new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
0))
-
-    testHarness.setProcessingTime(1006)
-
-    val result = testHarness.getOutput
-
-    val expectedOutput = new ConcurrentLinkedQueue[Object]()
-
-    // all elements at the same proc timestamp have the same value
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 4))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: 
JLong), true), 4))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: 
JLong), true), 5))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 5))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: 
JLong), true), 5))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: 
JLong), true), 6))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: 
JLong), true), 1003))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: 
JLong), true), 1003))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: 
JLong), true), 1003))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 1004))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: 
JLong), true), 1005))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: 
JLong), true), 1006))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: 
JLong), true), 1006))
-    expectedOutput.add(new StreamRecord(new CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: 
JLong), true), 1006))
-
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.",
-        expectedOutput, result, new RowResultSortComparator(6))
-
-    testHarness.close()
-
-  }
-}
-
-object BoundedProcessingOverRangeProcessFunctionTest {
-
-/**
- * Return 0 for equal CRows and non zero for different CRows
- */
-class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] 
with Serializable {
-
-    override def compare(o1: Object, o2: Object):Int = {
-
-      if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
-        // watermark is not expected
-         -1
-       } else {
-        val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
-        val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
-        row1.toString.compareTo(row2.toString)
-      }
-   }
-}
-
-/**
- * Simple test class that returns a specified field as the selector function
- */
-class TupleRowSelector(
-    private val selectorField:Int) extends KeySelector[CRow, Integer] {
-
-  override def getKey(value: CRow): Integer = {
-    value.row.getField(selectorField).asInstanceOf[Integer]
-  }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
index eb5acd5b..77798f9 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -19,15 +19,294 @@ package org.apache.flink.table.runtime.harness
 
 import java.util.{Comparator, Queue => JQueue}
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, 
LONG_TYPE_INFO, STRING_TYPE_INFO}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
-import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.codegen.GeneratedAggregationsFunction
+import org.apache.flink.table.functions.AggregateFunction
+import 
org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, 
LongMinWithRetractAggFunction, IntSumWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate.AggregateUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 
 class HarnessTestBase {
+
+  protected val MinMaxRowType = new RowTypeInfo(Array[TypeInformation[_]](
+    INT_TYPE_INFO,
+    LONG_TYPE_INFO,
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO,
+    LONG_TYPE_INFO),
+    Array("a", "b", "c", "d", "e"))
+
+  protected val SumRowType = new RowTypeInfo(Array[TypeInformation[_]](
+    LONG_TYPE_INFO,
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO),
+    Array("a", "b", "c"))
+
+  protected val minMaxCRowType = new CRowTypeInfo(MinMaxRowType)
+  protected val sumCRowType = new CRowTypeInfo(SumRowType)
+
+  protected val minMaxAggregates =
+    Array(new LongMinWithRetractAggFunction,
+          new 
LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+
+  protected val sumAggregates =
+    Array(new 
IntSumWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+
+  protected val minMaxAggregationStateType: RowTypeInfo =
+    AggregateUtil.createAccumulatorRowType(minMaxAggregates)
+
+  protected val sumAggregationStateType: RowTypeInfo =
+    AggregateUtil.createAccumulatorRowType(sumAggregates)
+
+  val minMaxCode: String =
+    """
+      |public class MinMaxAggregateHelper
+      |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+      |
+      |  transient 
org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+      |    fmin = null;
+      |
+      |  transient 
org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+      |    fmax = null;
+      |
+      |  public MinMaxAggregateHelper() throws Exception {
+      |
+      |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
+      |    
"MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
+      |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S"
 +
+      |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
+      |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
+      |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |
+      |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
+      |    
"MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
+      |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf"
 +
+      |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
+      |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
+      |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
+      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+      |  }
+      |
+      |  public void setAggregationResults(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmin;
+      |    output.setField(5, baseClass0.getValue(
+      |      
(org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)));
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
+      |      (org.apache.flink.table.functions.AggregateFunction) fmax;
+      |    output.setField(6, baseClass1.getValue(
+      |      
(org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)));
+      |  }
+      |
+      |  public void accumulate(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.accumulate(
+      |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.accumulate(
+      |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public void retract(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    fmin.retract(
+      |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+      |      accs.getField(0)),
+      |      (java.lang.Long) input.getField(4));
+      |
+      |    fmax.retract(
+      |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+      |      accs.getField(1)),
+      |      (java.lang.Long) input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createAccumulators() {
+      |
+      |    org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
+      |
+      |    accs.setField(
+      |      0,
+      |      fmin.createAccumulator());
+      |
+      |    accs.setField(
+      |      1,
+      |      fmax.createAccumulator());
+      |
+      |      return accs;
+      |  }
+      |
+      |  public void setForwardedFields(
+      |    org.apache.flink.types.Row input,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    output.setField(0, input.getField(0));
+      |    output.setField(1, input.getField(1));
+      |    output.setField(2, input.getField(2));
+      |    output.setField(3, input.getField(3));
+      |    output.setField(4, input.getField(4));
+      |  }
+      |
+      |  public org.apache.flink.types.Row createOutputRow() {
+      |    return new org.apache.flink.types.Row(7);
+      |  }
+      |
+      |/*******  This test does not use the following methods  *******/
+      |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+      |    org.apache.flink.types.Row a,
+      |    org.apache.flink.types.Row b) {
+      |    return null;
+      |  }
+      |
+      |  public void resetAccumulator(org.apache.flink.types.Row accs) {
+      |  }
+      |
+      |  public void setConstantFlags(org.apache.flink.types.Row output) {
+      |  }
+      |}
+    """.stripMargin
+
+  val sumAggCode: String =
+    """
+      |public final class SumAggregationHelper
+      |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+      |
+      |
+      |transient 
org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction
+      |sum = null;
+      |private final 
org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache
+      |    .flink.table.functions.aggfunctions.SumWithRetractAccumulator> 
accIt0 =
+      |      new 
org.apache.flink.table.runtime.aggregate.SingleElementIterable<org.apache.flink
+      |      .table
+      |      .functions.aggfunctions.SumWithRetractAccumulator>();
+      |
+      |  public SumAggregationHelper() throws Exception {
+      |
+      |sum = 
(org.apache.flink.table.functions.aggfunctions.IntSumWithRetractAggFunction)
+      |org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+      |.deserialize
+      
|("rO0ABXNyAEpvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuSW50U3VtV2l0a"
 +
+      
|"FJldHJhY3RBZ2dGdW5jdGlvblkfWkeNZDeDAgAAeHIAR29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25"
 +
+      
|"zLmFnZ2Z1bmN0aW9ucy5TdW1XaXRoUmV0cmFjdEFnZ0Z1bmN0aW9ut2oWrOsLrs0CAAFMAAdudW1lcmljdAAUT"
 +
+      
|"HNjYWxhL21hdGgvTnVtZXJpYzt4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXR"
 +
+      
|"lRnVuY3Rpb25NxhU-0mM1_AIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVma"
 +
+      
|"W5lZEZ1bmN0aW9uLQH3VDG4DJMCAAB4cHNyACFzY2FsYS5tYXRoLk51bWVyaWMkSW50SXNJbnRlZ3JhbCTw6XA"
 +
+      |"59sPAzAIAAHhw");
+      |
+      |
+      |  }
+      |
+      |  public final void setAggregationResults(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row output) {
+      |
+      |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
+      |      (org.apache.flink.table.functions.AggregateFunction)
+      |      sum;
+      |
+      |    output.setField(
+      |      1,
+      |      baseClass0.getValue((org.apache.flink.table.functions.aggfunctions
+      |      .SumWithRetractAccumulator) accs.getField(0)));
+      |  }
+      |
+      |  public final void accumulate(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |
+      |    sum.accumulate(
+      |      
((org.apache.flink.table.functions.aggfunctions.SumWithRetractAccumulator) accs
+      |      .getField
+      |      (0)),
+      |      (java.lang.Integer) input.getField(1));
+      |  }
+      |
+      |
+      |  public final void retract(
+      |    org.apache.flink.types.Row accs,
+      |    org.apache.flink.types.Row input) {
+      |  }
+      |
+      |  public final org.apache.flink.types.Row createAccumulators()
+      |     {
+      |
+      |      org.apache.flink.types.Row accs =
+      |          new org.apache.flink.types.Row(1);
+      |
+      |    accs.setField(
+      |      0,
+      |      sum.createAccumulator());
+      |
+      |      return accs;
+      |  }
+      |
+      |  public final void setForwardedFields(
+      |    org.apache.flink.types.Row input,
+      |    org.apache.flink.types.Row output)
+      |     {
+      |
+      |    output.setField(
+      |      0,
+      |      input.getField(0));
+      |  }
+      |
+      |  public final void setConstantFlags(org.apache.flink.types.Row output)
+      |     {
+      |
+      |  }
+      |
+      |  public final org.apache.flink.types.Row createOutputRow() {
+      |    return new org.apache.flink.types.Row(2);
+      |  }
+      |
+      |
+      |  public final org.apache.flink.types.Row mergeAccumulatorsPair(
+      |    org.apache.flink.types.Row a,
+      |    org.apache.flink.types.Row b)
+      |            {
+      |
+      |      return a;
+      |
+      |  }
+      |
+      |  public final void resetAccumulator(
+      |    org.apache.flink.types.Row accs) {
+      |  }
+      |}
+      |""".stripMargin
+
+
+  protected val minMaxFuncName = "MinMaxAggregateHelper"
+  protected val sumFuncName = "SumAggregationHelper"
+
+  protected val genMinMaxAggFunction = 
GeneratedAggregationsFunction(minMaxFuncName, minMaxCode)
+  protected val genSumAggFunction = GeneratedAggregationsFunction(sumFuncName, 
sumAggCode)
+
   def createHarnessTester[IN, OUT, KEY](
     operator: OneInputStreamOperator[IN, OUT],
     keySelector: KeySelector[IN, KEY],

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
new file mode 100644
index 0000000..04214f9
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/NonWindowHarnessTest.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.harness.HarnessTestBase._
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class NonWindowHarnessTest extends HarnessTestBase {
+
+  protected var queryConfig =
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
+
+  @Test
+  def testProcTimeNonWindow(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new GroupAggProcessFunction(
+        genSumAggFunction,
+        sumAggregationStateType,
+        false,
+        queryConfig))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](2),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    // register cleanup timer with 3001
+    testHarness.setProcessingTime(1)
+
+    testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: 
JInt, "bbb"), true), 1))
+    // reuse timer 3001
+    testHarness.setProcessingTime(1000)
+    testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: 
JInt, "aaa"), true), 1))
+
+    // register cleanup timer with 4002
+    testHarness.setProcessingTime(1002)
+    testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: 
JInt, "bbb"), true), 1))
+
+    // trigger cleanup timer and register cleanup timer with 7003
+    testHarness.setProcessingTime(4003)
+    testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: 
JInt, "bbb"), true), 1))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 6: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 10: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 3: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 5: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 11: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 18: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 3: JInt), 
true), 1))
+
+    verify(expectedOutput, result, new RowResultSortComparator(6))
+
+    testHarness.close()
+  }
+
+  @Test
+  def testProcTimeNonWindowWithRetract(): Unit = {
+
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new GroupAggProcessFunction(
+        genSumAggFunction,
+        sumAggregationStateType,
+        true,
+        queryConfig))
+
+    val testHarness =
+      createHarnessTester(
+        processFunction,
+        new TupleRowKeySelector[String](2),
+        BasicTypeInfo.STRING_TYPE_INFO)
+
+    testHarness.open()
+
+    // register cleanup timer with 3001
+    testHarness.setProcessingTime(1)
+
+    testHarness.processElement(new StreamRecord(CRow(Row.of(1L: JLong, 1: 
JInt, "aaa"), true), 1))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(2L: JLong, 1: 
JInt, "bbb"), true), 2))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(3L: JLong, 2: 
JInt, "aaa"), true), 3))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(4L: JLong, 3: 
JInt, "ccc"), true), 4))
+
+    // trigger cleanup timer and register cleanup timer with 6002
+    testHarness.setProcessingTime(3002)
+    testHarness.processElement(new StreamRecord(CRow(Row.of(5L: JLong, 4: 
JInt, "aaa"), true), 5))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(6L: JLong, 2: 
JInt, "bbb"), true), 6))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(7L: JLong, 5: 
JInt, "aaa"), true), 7))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(8L: JLong, 6: 
JInt, "eee"), true), 8))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(9L: JLong, 7: 
JInt, "aaa"), true), 9))
+    testHarness.processElement(new StreamRecord(CRow(Row.of(10L: JLong, 3: 
JInt, "bbb"), true), 10))
+
+    val result = testHarness.getOutput
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    expectedOutput.add(new StreamRecord(CRow(Row.of(1L: JLong, 1: JInt), 
true), 1))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(2L: JLong, 1: JInt), 
true), 2))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 1: JInt), 
false), 3))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(3L: JLong, 3: JInt), 
true), 3))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(4L: JLong, 3: JInt), 
true), 4))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(5L: JLong, 4: JInt), 
true), 5))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(6L: JLong, 2: JInt), 
true), 6))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 4: JInt), 
false), 7))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(7L: JLong, 9: JInt), 
true), 7))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(8L: JLong, 6: JInt), 
true), 8))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 9: JInt), 
false), 9))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(9L: JLong, 16: JInt), 
true), 9))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 2: JInt), 
false), 10))
+    expectedOutput.add(new StreamRecord(CRow(Row.of(10L: JLong, 5: JInt), 
true), 10))
+
+    verify(expectedOutput, result, new RowResultSortComparator(0))
+
+    testHarness.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 56ca85c..786a843 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -18,180 +18,34 @@
 package org.apache.flink.table.runtime.harness
 
 import java.lang.{Integer => JInt, Long => JLong}
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentLinkedQueue}
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
-import org.apache.flink.table.codegen.GeneratedAggregationsFunction
-import org.apache.flink.table.functions.AggregateFunction
-import 
org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, 
LongMinWithRetractAggFunction}
+import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.harness.HarnessTestBase._
-import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.junit.Test
 
 class OverWindowHarnessTest extends HarnessTestBase{
 
-  private val rT = new RowTypeInfo(Array[TypeInformation[_]](
-    INT_TYPE_INFO,
-    LONG_TYPE_INFO,
-    INT_TYPE_INFO,
-    STRING_TYPE_INFO,
-    LONG_TYPE_INFO),
-    Array("a", "b", "c", "d", "e"))
-
-  private val cRT = new CRowTypeInfo(rT)
-
-  private val aggregates =
-    Array(new LongMinWithRetractAggFunction,
-      new 
LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
-  private val aggregationStateType: RowTypeInfo = 
AggregateUtil.createAccumulatorRowType(aggregates)
-
-  val funcCode: String =
-    """
-      |public class BoundedOverAggregateHelper
-      |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
-      |
-      |  transient 
org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
-      |    fmin = null;
-      |
-      |  transient 
org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
-      |    fmax = null;
-      |
-      |  public BoundedOverAggregateHelper() throws Exception {
-      |
-      |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
-      |    
"MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-      |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S"
 +
-      |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-      |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
-      |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
-      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
-      |
-      |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
-      |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-      |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
-      |    
"MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
-      |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf"
 +
-      |    
"AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV"
 +
-      |    
"uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS"
 +
-      |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
-      |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
-      |  }
-      |
-      |  public void setAggregationResults(
-      |    org.apache.flink.types.Row accs,
-      |    org.apache.flink.types.Row output) {
-      |
-      |    org.apache.flink.table.functions.AggregateFunction baseClass0 =
-      |      (org.apache.flink.table.functions.AggregateFunction) fmin;
-      |    output.setField(5, baseClass0.getValue(
-      |      
(org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-      |      accs.getField(0)));
-      |
-      |    org.apache.flink.table.functions.AggregateFunction baseClass1 =
-      |      (org.apache.flink.table.functions.AggregateFunction) fmax;
-      |    output.setField(6, baseClass1.getValue(
-      |      
(org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-      |      accs.getField(1)));
-      |  }
-      |
-      |  public void accumulate(
-      |    org.apache.flink.types.Row accs,
-      |    org.apache.flink.types.Row input) {
-      |
-      |    fmin.accumulate(
-      |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-      |      accs.getField(0)),
-      |      (java.lang.Long) input.getField(4));
-      |
-      |    fmax.accumulate(
-      |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-      |      accs.getField(1)),
-      |      (java.lang.Long) input.getField(4));
-      |  }
-      |
-      |  public void retract(
-      |    org.apache.flink.types.Row accs,
-      |    org.apache.flink.types.Row input) {
-      |
-      |    fmin.retract(
-      |      
((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
-      |      accs.getField(0)),
-      |      (java.lang.Long) input.getField(4));
-      |
-      |    fmax.retract(
-      |      
((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
-      |      accs.getField(1)),
-      |      (java.lang.Long) input.getField(4));
-      |  }
-      |
-      |  public org.apache.flink.types.Row createAccumulators() {
-      |
-      |    org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
-      |
-      |    accs.setField(
-      |      0,
-      |      fmin.createAccumulator());
-      |
-      |    accs.setField(
-      |      1,
-      |      fmax.createAccumulator());
-      |
-      |      return accs;
-      |  }
-      |
-      |  public void setForwardedFields(
-      |    org.apache.flink.types.Row input,
-      |    org.apache.flink.types.Row output) {
-      |
-      |    output.setField(0, input.getField(0));
-      |    output.setField(1, input.getField(1));
-      |    output.setField(2, input.getField(2));
-      |    output.setField(3, input.getField(3));
-      |    output.setField(4, input.getField(4));
-      |  }
-      |
-      |  public org.apache.flink.types.Row createOutputRow() {
-      |    return new org.apache.flink.types.Row(7);
-      |  }
-      |
-      |/*******  This test does not use the following methods  *******/
-      |  public org.apache.flink.types.Row mergeAccumulatorsPair(
-      |    org.apache.flink.types.Row a,
-      |    org.apache.flink.types.Row b) {
-      |    return null;
-      |  }
-      |
-      |  public void resetAccumulator(org.apache.flink.types.Row accs) {
-      |  }
-      |
-      |  public void setConstantFlags(org.apache.flink.types.Row output) {
-      |  }
-      |}
-    """.stripMargin
-
-
-  private val funcName = "BoundedOverAggregateHelper"
-
-  private val genAggFunction = GeneratedAggregationsFunction(funcName, 
funcCode)
-
+  protected var queryConfig =
+    new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), 
Time.seconds(3))
 
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRowsOver(
-        genAggFunction,
+        genMinMaxAggFunction,
         2,
-        aggregationStateType,
-        cRT))
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        queryConfig))
 
     val testHarness =
       createHarnessTester(processFunction,new 
TupleRowKeySelector[Integer](0),BasicTypeInfo
@@ -199,6 +53,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.open()
 
+    // register cleanup timer with 3001
     testHarness.setProcessingTime(1)
 
     testHarness.processElement(new StreamRecord(
@@ -209,6 +64,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+
+    // register cleanup timer with 4100
+    testHarness.setProcessingTime(1100)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
     testHarness.processElement(new StreamRecord(
@@ -220,15 +78,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
 
-    testHarness.setProcessingTime(2)
+    // register cleanup timer with 6001
+    testHarness.setProcessingTime(3001)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+
+    // trigger cleanup timer and register cleanup timer with 9002
+    testHarness.setProcessingTime(6002)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
+        CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
2))
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
 
@@ -274,10 +136,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
         Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: 
JLong), true), 2))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 
10L: JLong), true), 2))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 10L: JLong, 
10L: JLong), true), 2))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 
40L: JLong), true), 2))
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 
40L: JLong), true), 2))
 
     verify(expectedOutput, result, new RowResultSortComparator(6))
 
@@ -292,10 +154,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new ProcTimeBoundedRangeOver(
-        genAggFunction,
-        1000,
-        aggregationStateType,
-        cRT))
+        genMinMaxAggFunction,
+        4000,
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        queryConfig))
 
     val testHarness =
       createHarnessTester(
@@ -305,6 +168,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.open()
 
+    // register cleanup timer with 3003
     testHarness.setProcessingTime(3)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
@@ -314,6 +178,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.setProcessingTime(4)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+
+    // trigger cleanup timer and register cleanup timer with 6003
+    testHarness.setProcessingTime(3003)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
@@ -323,9 +190,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
 
-    testHarness.setProcessingTime(6)
+    // register cleanup timer with 9002
+    testHarness.setProcessingTime(6002)
 
-    testHarness.setProcessingTime(1002)
+    testHarness.setProcessingTime(7002)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
@@ -333,15 +201,15 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
 
-    testHarness.setProcessingTime(1003)
+    // register cleanup timer with 14002
+    testHarness.setProcessingTime(11002)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
 
-    testHarness.setProcessingTime(1004)
+    testHarness.setProcessingTime(11004)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
 
-    testHarness.setProcessingTime(1005)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
@@ -349,7 +217,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
 
-    testHarness.setProcessingTime(1006)
+    testHarness.setProcessingTime(11006)
 
     val result = testHarness.getOutput
 
@@ -364,40 +232,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
         Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 
10L: JLong), true), 4))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: 
JLong), true), 5))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 5))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: 
JLong), true), 5))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 3L: JLong, 4L: 
JLong), true), 3004))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 
20L: JLong), true), 5))
+      CRow(Row.of(
+        2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 20L: JLong, 20L: 
JLong), true), 3004))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: 
JLong), true), 6))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 4L: JLong, 4L: 
JLong), true), 6))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: 
JLong), true), 1003))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 5L: JLong, 6L: 
JLong), true), 7003))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: 
JLong), true), 1003))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: 
JLong), true), 7003))
     expectedOutput.add(new StreamRecord(
       CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: 
JLong), true), 1003))
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 30L: JLong, 30L: 
JLong), true), 7003))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 1004))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 7L: JLong, 7L: 
JLong), true), 11003))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: 
JLong), true), 1005))
+      CRow(Row.of(
+        1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
     expectedOutput.add(new StreamRecord(
-      CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: 
JLong), true), 1006))
+      CRow(Row.of(
+        1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
     expectedOutput.add(new StreamRecord(
       CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: 
JLong), true), 1006))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 7L: JLong, 10L: 
JLong), true), 11005))
     expectedOutput.add(new StreamRecord(
       CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: 
JLong), true), 1006))
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 11005))
 
     verify(expectedOutput, result, new RowResultSortComparator(6))
 
@@ -409,8 +277,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new ProcTimeUnboundedPartitionedOver(
-        genAggFunction,
-        aggregationStateType))
+        genMinMaxAggFunction,
+        minMaxAggregationStateType,
+        queryConfig))
 
     val testHarness =
       createHarnessTester(
@@ -420,6 +289,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.open()
 
+    // register cleanup timer with 4003
+    testHarness.setProcessingTime(1003)
+
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
@@ -438,18 +310,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
-
-    testHarness.setProcessingTime(1003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 
1003))
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 
1003))
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+
+    // trigger cleanup timer and register cleanup timer with 8003
+    testHarness.setProcessingTime(5003)
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
1003))
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 
5003))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
1003))
+      CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 
5003))
     testHarness.processElement(new StreamRecord(
-      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
1003))
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 
5003))
 
     val result = testHarness.getOutput
 
@@ -484,19 +357,19 @@ class OverWindowHarnessTest extends HarnessTestBase{
         Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 
30L: JLong), true), 0))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 1003))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: 
JLong), true), 0))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: 
JLong), true), 1003))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: 
JLong), true), 0))
     expectedOutput.add(new StreamRecord(
       CRow(
-        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: 
JLong), true), 1003))
+        Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 9L: JLong, 9L: 
JLong), true), 5003))
     expectedOutput.add(new StreamRecord(
       CRow(
-      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: 
JLong), true), 1003))
+      Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: 
JLong), true), 5003))
     expectedOutput.add(new StreamRecord(
       CRow(
-      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: 
JLong), true), 1003))
+      Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 5003))
 
     verify(expectedOutput, result, new RowResultSortComparator(6))
     testHarness.close()
@@ -510,10 +383,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRangeOver(
-        genAggFunction,
-        aggregationStateType,
-        cRT,
-        4000))
+        genMinMaxAggFunction,
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        4000,
+        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -573,6 +447,40 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(19000)
 
+    // test cleanup
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(20000)
+
+    // check that state is removed after max retention time
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 3000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 4500
+    testHarness.processWatermark(20010) // compute output
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(4499)
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(4500)
+    assert(testHarness.numKeyedStateEntries() == 0) // check that all state is 
gone
+
+    // check that state is only removed if all data was processed
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 
20011)) // clean-up 6500
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 
8500
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+
+    testHarness.processWatermark(20020) // schedule emission
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(8499) // clean-up
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(8500) // clean-up
+    assert(testHarness.numKeyedStateEntries() == 0) // check that all state is 
gone
+
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -621,6 +529,16 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(
       Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: 
JLong), true), 12001))
 
+    expectedOutput.add(new StreamRecord(
+      CRow(
+      Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: 
JLong), true), 20011))
+
     verify(expectedOutput, result, new RowResultSortComparator(6))
     testHarness.close()
   }
@@ -630,10 +548,11 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new RowTimeBoundedRowsOver(
-        genAggFunction,
-        aggregationStateType,
-        cRT,
-        3))
+        genMinMaxAggFunction,
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        3,
+        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -689,6 +608,41 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(19000)
 
+    // test cleanup
+    testHarness.setProcessingTime(1000)
+    testHarness.processWatermark(20000)
+
+    // check that state is removed after max retention time
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 3000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 4500
+    testHarness.processWatermark(20010) // compute output
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(4499)
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(4500)
+    assert(testHarness.numKeyedStateEntries() == 0) // check that all state is 
gone
+
+    // check that state is only removed if all data was processed
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong), true), 
20011)) // clean-up 6500
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 
8500
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+
+    testHarness.processWatermark(20020) // schedule emission
+
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(8499) // clean-up
+    assert(testHarness.numKeyedStateEntries() > 0) // check that we have state
+    testHarness.setProcessingTime(8500) // clean-up
+    assert(testHarness.numKeyedStateEntries() == 0) // check that all state is 
gone
+
+
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -736,6 +690,16 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(
       Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: 
JLong), true), 12001))
 
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(3: JInt, 0L: JLong, 0: JInt, "ccc", 3L: JLong, 3L: JLong, 3L: 
JLong), true), 20011))
+
     verify(expectedOutput, result, new RowResultSortComparator(6))
     testHarness.close()
   }
@@ -748,9 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRangeOver(
-        genAggFunction,
-        aggregationStateType,
-        cRT))
+        genMinMaxAggFunction,
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -760,6 +725,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.open()
 
+    testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
@@ -806,6 +772,30 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(19000)
 
+    // test cleanup
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing 
should happen
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(3000) // clean up is triggered
+    assert(testHarness.numKeyedStateEntries() == 0)
+
+    testHarness.processWatermark(20000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000
+
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(5000) // does not clean up, because data 
left. New timer 7000
+    testHarness.processWatermark(20010) // compute output
+
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing 
should happen
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(7000) // clean up is triggered
+    assert(testHarness.numKeyedStateEntries() == 0)
+
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -854,6 +844,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(
       Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: 
JLong), true), 12001))
 
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+
     verify(expectedOutput, result, new RowResultSortComparator(6))
     testHarness.close()
   }
@@ -863,9 +860,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     val processFunction = new KeyedProcessOperator[String, CRow, CRow](
       new RowTimeUnboundedRowsOver(
-        genAggFunction,
-        aggregationStateType,
-        cRT))
+        genMinMaxAggFunction,
+        minMaxAggregationStateType,
+        minMaxCRowType,
+        new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(1), 
Time.seconds(2))))
 
     val testHarness =
       createHarnessTester(
@@ -875,6 +873,7 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.open()
 
+    testHarness.setProcessingTime(1000)
     testHarness.processWatermark(800)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
@@ -921,6 +920,30 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(19000)
 
+    // test cleanup
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(2999) // clean up timer is 3000, so nothing 
should happen
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(3000) // clean up is triggered
+    assert(testHarness.numKeyedStateEntries() == 0)
+
+    testHarness.processWatermark(20000)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+    testHarness.setProcessingTime(2500)
+    testHarness.processElement(new StreamRecord(
+      CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000
+
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(5000) // does not clean up, because data 
left. New timer 7000
+    testHarness.processWatermark(20010) // compute output
+
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(6999) // clean up timer is 3000, so nothing 
should happen
+    assert(testHarness.numKeyedStateEntries() > 0)
+    testHarness.setProcessingTime(7000) // clean up is triggered
+    assert(testHarness.numKeyedStateEntries() == 0)
+
     val result = testHarness.getOutput
 
     val expectedOutput = new ConcurrentLinkedQueue[Object]()
@@ -968,6 +991,13 @@ class OverWindowHarnessTest extends HarnessTestBase{
       CRow(
       Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: 
JLong), true), 12001))
 
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong, 1L: JLong, 1L: 
JLong), true), 20001))
+    expectedOutput.add(new StreamRecord(
+      CRow(
+        Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong, 1L: JLong, 2L: 
JLong), true), 20002))
+
     verify(expectedOutput, result, new RowResultSortComparator(6))
     testHarness.close()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/24808871/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 3d79e22..c4e2433 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -28,7 +28,7 @@ class MockTableEnvironment extends TableEnvironment(new 
TableConfig) {
   override private[flink] def writeToSink[T](
       table: Table,
       sink: TableSink[T],
-      qConfig: QueryConfig): Unit = ???
+      queryConfig: QueryConfig): Unit = ???
 
   override protected def checkValidTableName(name: String): Unit = ???
 

Reply via email to