This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1d0bd1  [FLINK-11091][table] Clear the use of deprecated 
`process(ProcessFunction)` of KeyedStream in table operators.
d1d0bd1 is described below

commit d1d0bd1b85fc501c67498acaf8dc528b15aa1686
Author: sunjincheng121 <[email protected]>
AuthorDate: Tue Feb 26 17:32:20 2019 +0800

    [FLINK-11091][table] Clear the use of deprecated `process(ProcessFunction)` 
of KeyedStream in table operators.
    
    This closes #7837
---
 .../flink/table/api/BatchTableEnvironment.scala    |  1 -
 .../datastream/DataStreamGroupAggregate.scala      | 30 ++++++----
 .../nodes/datastream/DataStreamOverAggregate.scala | 66 ++++++++++++----------
 .../plan/nodes/datastream/DataStreamSort.scala     | 13 +++--
 .../table/runtime/aggregate/AggregateUtil.scala    | 30 +++++-----
 .../aggregate/GroupAggProcessFunction.scala        | 10 ++--
 .../aggregate/ProcTimeBoundedRangeOver.scala       | 10 ++--
 .../aggregate/ProcTimeBoundedRowsOver.scala        | 10 ++--
 .../aggregate/ProcTimeSortProcessFunction.scala    | 10 ++--
 .../runtime/aggregate/ProcTimeUnboundedOver.scala  | 10 ++--
 .../ProcessFunctionWithCleanupState.scala          |  8 +--
 .../aggregate/RowTimeBoundedRangeOver.scala        | 10 ++--
 .../runtime/aggregate/RowTimeBoundedRowsOver.scala | 10 ++--
 .../aggregate/RowTimeSortProcessFunction.scala     | 10 ++--
 .../runtime/aggregate/RowTimeUnboundedOver.scala   | 18 +++---
 .../flink/table/runtime/aggregate/SortUtil.scala   | 10 ++--
 .../runtime/harness/AggFunctionHarnessTest.scala   |  6 +-
 .../harness/GroupAggregateHarnessTest.scala        | 14 ++---
 .../runtime/harness/OverWindowHarnessTest.scala    | 30 +++++-----
 .../harness/SortProcessFunctionHarnessTest.scala   | 10 ++--
 .../ProcessFunctionWithCleanupStateTest.scala      | 16 +++---
 21 files changed, 174 insertions(+), 158 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 99e9d7e..1aa203b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.api
 import _root_.java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.plan.hep.HepMatchOrder
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 5f4b186c..d48b682 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -17,11 +17,14 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
+import java.lang.{Byte => JByte}
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
 import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -32,6 +35,7 @@ import 
org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 
 /**
   *
@@ -131,23 +135,25 @@ class DataStreamGroupAggregate(
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"select: ($aggString)"
 
-    val processFunction = AggregateUtil.createGroupAggregateFunction(
-      generator,
-      namedAggregates,
-      inputSchema.relDataType,
-      inputSchema.fieldTypeInfos,
-      groupings,
-      queryConfig,
-      tableEnv.getConfig,
-      DataStreamRetractionRules.isAccRetract(this),
-      DataStreamRetractionRules.isAccRetract(getInput))
+    def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+      AggregateUtil.createGroupAggregateFunction[K](
+        generator,
+        namedAggregates,
+        inputSchema.relDataType,
+        inputSchema.fieldTypeInfos,
+        groupings,
+        queryConfig,
+        tableEnv.getConfig,
+        DataStreamRetractionRules.isAccRetract(this),
+        DataStreamRetractionRules.isAccRetract(getInput))
+    }
 
     val result: DataStream[CRow] =
     // grouped / keyed aggregation
       if (groupings.nonEmpty) {
         inputDS
         .keyBy(new CRowKeySelector(groupings, 
inputSchema.projectedTypeInfo(groupings)))
-        .process(processFunction)
+        .process(createKeyedProcessFunction[Row])
         .returns(outRowType)
         .name(keyedAggOpName)
         .asInstanceOf[DataStream[CRow]]
@@ -156,7 +162,7 @@ class DataStreamGroupAggregate(
       else {
         inputDS
         .keyBy(new NullByteKeySelector[CRow])
-        .process(processFunction)
+        .process(createKeyedProcessFunction[JByte])
         .setParallelism(1)
         .setMaxParallelism(1)
         .returns(outRowType)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index c1693d9..a22b4cc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
+import java.lang.{Byte => JByte}
 import java.util.{List => JList}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.java.functions.NullByteKeySelector
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableConfig, TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.AggregationCodeGenerator
@@ -39,6 +41,7 @@ import 
org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -214,25 +217,27 @@ class DataStreamOverAggregate(
     // get the output types
     val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
 
-    val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
-      generator,
-      namedAggregates,
-      aggregateInputType,
-      inputSchema.relDataType,
-      inputSchema.typeInfo,
-      inputSchema.fieldTypeInfos,
-      queryConfig,
-      tableConfig,
-      rowTimeIdx,
-      partitionKeys.nonEmpty,
-      isRowsClause)
+    def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+      AggregateUtil.createUnboundedOverProcessFunction[K](
+        generator,
+        namedAggregates,
+        aggregateInputType,
+        inputSchema.relDataType,
+        inputSchema.typeInfo,
+        inputSchema.fieldTypeInfos,
+        queryConfig,
+        tableConfig,
+        rowTimeIdx,
+        partitionKeys.nonEmpty,
+        isRowsClause)
+    }
 
     val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(new CRowKeySelector(partitionKeys, 
inputSchema.projectedTypeInfo(partitionKeys)))
-          .process(processFunction)
+          .process(createKeyedProcessFunction[Row])
           .returns(returnTypeInfo)
           .name(aggOpName)
           .asInstanceOf[DataStream[CRow]]
@@ -240,7 +245,7 @@ class DataStreamOverAggregate(
       // non-partitioned aggregation
       else {
         inputDS.keyBy(new NullByteKeySelector[CRow])
-          .process(processFunction).setParallelism(1).setMaxParallelism(1)
+          
.process(createKeyedProcessFunction[JByte]).setParallelism(1).setMaxParallelism(1)
           .returns(returnTypeInfo)
           .name(aggOpName)
       }
@@ -268,25 +273,28 @@ class DataStreamOverAggregate(
     // get the output types
     val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
 
-    val processFunction = AggregateUtil.createBoundedOverProcessFunction(
-      generator,
-      namedAggregates,
-      aggregateInputType,
-      inputSchema.relDataType,
-      inputSchema.typeInfo,
-      inputSchema.fieldTypeInfos,
-      precedingOffset,
-      queryConfig,
-      tableConfig,
-      isRowsClause,
-      rowTimeIdx
-    )
+    def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+      AggregateUtil.createBoundedOverProcessFunction[K](
+        generator,
+        namedAggregates,
+        aggregateInputType,
+        inputSchema.relDataType,
+        inputSchema.typeInfo,
+        inputSchema.fieldTypeInfos,
+        precedingOffset,
+        queryConfig,
+        tableConfig,
+        isRowsClause,
+        rowTimeIdx
+      )
+    }
+
     val result: DataStream[CRow] =
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
           .keyBy(new CRowKeySelector(partitionKeys, 
inputSchema.projectedTypeInfo(partitionKeys)))
-          .process(processFunction)
+          .process(createKeyedProcessFunction[Row])
           .returns(returnTypeInfo)
           .name(aggOpName)
       }
@@ -294,7 +302,7 @@ class DataStreamOverAggregate(
       else {
         inputDS
           .keyBy(new NullByteKeySelector[CRow])
-          .process(processFunction).setParallelism(1).setMaxParallelism(1)
+          
.process(createKeyedProcessFunction[JByte]).setParallelism(1).setMaxParallelism(1)
           .returns(returnTypeInfo)
           .name(aggOpName)
       }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
index 8f9942f..936bbb2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.datastream
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{ RelNode, RelWriter }
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.runtime.aggregate._
@@ -34,6 +34,7 @@ import org.apache.flink.table.api.{StreamQueryConfig, 
StreamTableEnvironment, Ta
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.nodes.CommonSort
 import org.apache.calcite.rel.core.Sort
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 
 /**
  * Flink RelNode which matches along with Sort Rule.
@@ -146,14 +147,14 @@ class DataStreamSort(
     // if the order has secondary sorting fields in addition to the proctime
     if (sortCollation.getFieldCollations.size() > 1) {
     
-      val processFunction = SortUtil.createProcTimeSortFunction(
+      val KeyedProcessFunction = SortUtil.createProcTimeSortFunction(
         sortCollation,
         inputSchema.relDataType,
         inputSchema.typeInfo,
         execCfg)
       
       inputDS.keyBy(new NullByteKeySelector[CRow])
-        .process(processFunction).setParallelism(1).setMaxParallelism(1)
+        .process(KeyedProcessFunction).setParallelism(1).setMaxParallelism(1)
         .returns(returnTypeInfo)
         .asInstanceOf[DataStream[CRow]]
     } else {
@@ -175,14 +176,14 @@ class DataStreamSort(
 
     val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
        
-    val processFunction = SortUtil.createRowTimeSortFunction(
+    val keyedProcessFunction = SortUtil.createRowTimeSortFunction(
       sortCollation,
       inputSchema.relDataType,
       inputSchema.typeInfo,
       execCfg)
-      
+
     inputDS.keyBy(new NullByteKeySelector[CRow])
-      .process(processFunction).setParallelism(1).setMaxParallelism(1)
+      .process(keyedProcessFunction).setParallelism(1).setMaxParallelism(1)
       .returns(returnTypeInfo)
       .asInstanceOf[DataStream[CRow]]
        
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 7c38254..f87ed3c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFuncti
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala.typeutils.Types
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
 import org.apache.flink.table.api.dataview.DataViewSpec
@@ -72,7 +72,7 @@ object AggregateUtil {
     * @param isPartitioned It is a tag that indicate whether the input is 
partitioned
     * @param isRowsClause It is a tag that indicates whether the OVER clause 
is ROWS clause
     */
-  private[flink] def createUnboundedOverProcessFunction(
+  private[flink] def createUnboundedOverProcessFunction[K](
       generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       aggregateInputType: RelDataType,
@@ -84,7 +84,7 @@ object AggregateUtil {
       rowTimeIdx: Option[Int],
       isPartitioned: Boolean,
       isRowsClause: Boolean)
-    : ProcessFunction[CRow, CRow] = {
+    : KeyedProcessFunction[K, CRow, CRow] = {
 
     val aggregateMetadata = extractAggregateMetadata(
         namedAggregates.map(_.getKey),
@@ -123,7 +123,7 @@ object AggregateUtil {
     if (rowTimeIdx.isDefined) {
       if (isRowsClause) {
         // ROWS unbounded over process function
-        new RowTimeUnboundedRowsOver(
+        new RowTimeUnboundedRowsOver[K](
           genFunction,
           aggregationStateType,
           CRowTypeInfo(inputTypeInfo),
@@ -131,7 +131,7 @@ object AggregateUtil {
           queryConfig)
       } else {
         // RANGE unbounded over process function
-        new RowTimeUnboundedRangeOver(
+        new RowTimeUnboundedRangeOver[K](
           genFunction,
           aggregationStateType,
           CRowTypeInfo(inputTypeInfo),
@@ -139,7 +139,7 @@ object AggregateUtil {
           queryConfig)
       }
     } else {
-      new ProcTimeUnboundedOver(
+      new ProcTimeUnboundedOver[K](
         genFunction,
         aggregationStateType,
         queryConfig)
@@ -160,7 +160,7 @@ object AggregateUtil {
     * @param consumeRetraction It is a tag that indicates whether consume the 
retract record.
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
-  private[flink] def createGroupAggregateFunction(
+  private[flink] def createGroupAggregateFunction[K](
       generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputRowType: RelDataType,
@@ -169,7 +169,7 @@ object AggregateUtil {
       queryConfig: StreamQueryConfig,
       tableConfig: TableConfig,
       generateRetraction: Boolean,
-      consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
+      consumeRetraction: Boolean): KeyedProcessFunction[K, CRow, CRow] = {
 
     val aggregateMetadata = extractAggregateMetadata(
         namedAggregates.map(_.getKey),
@@ -202,7 +202,7 @@ object AggregateUtil {
 
     val aggregationStateType: RowTypeInfo = new RowTypeInfo(aggregateMetadata
       .getAggregatesAccumulatorTypes: _*)
-    new GroupAggProcessFunction(
+    new GroupAggProcessFunction[K](
       genFunction,
       aggregationStateType,
       generateRetraction,
@@ -225,7 +225,7 @@ object AggregateUtil {
     * @param rowTimeIdx      The index of the rowtime field or None in case of 
processing time.
     * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
     */
-  private[flink] def createBoundedOverProcessFunction(
+  private[flink] def createBoundedOverProcessFunction[K](
       generator: AggregationCodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       aggregateInputType: RelDataType,
@@ -237,7 +237,7 @@ object AggregateUtil {
       tableConfig: TableConfig,
       isRowsClause: Boolean,
       rowTimeIdx: Option[Int])
-    : ProcessFunction[CRow, CRow] = {
+    : KeyedProcessFunction[K, CRow, CRow] = {
 
     val needRetract = true
     val aggregateMetadata = extractAggregateMetadata(
@@ -277,7 +277,7 @@ object AggregateUtil {
       .getAggregatesAccumulatorTypes: _*)
     if (rowTimeIdx.isDefined) {
       if (isRowsClause) {
-        new RowTimeBoundedRowsOver(
+        new RowTimeBoundedRowsOver[K](
           genFunction,
           aggregationStateType,
           inputRowType,
@@ -285,7 +285,7 @@ object AggregateUtil {
           rowTimeIdx.get,
           queryConfig)
       } else {
-        new RowTimeBoundedRangeOver(
+        new RowTimeBoundedRangeOver[K](
           genFunction,
           aggregationStateType,
           inputRowType,
@@ -295,14 +295,14 @@ object AggregateUtil {
       }
     } else {
       if (isRowsClause) {
-        new ProcTimeBoundedRowsOver(
+        new ProcTimeBoundedRowsOver[K](
           genFunction,
           precedingOffset,
           aggregationStateType,
           inputRowType,
           queryConfig)
       } else {
-        new ProcTimeBoundedRangeOver(
+        new ProcTimeBoundedRangeOver[K](
           genFunction,
           precedingOffset,
           aggregationStateType,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 2d72e6d..5bbec22 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -22,7 +22,7 @@ import java.lang.{Long => JLong}
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.CRow
@@ -36,12 +36,12 @@ import org.apache.flink.util.Collector
   * @param genAggregations      Generated aggregate helper function
   * @param aggregationStateType The row type info of aggregation
   */
-class GroupAggProcessFunction(
+class GroupAggProcessFunction[K](
     private val genAggregations: GeneratedAggregationsFunction,
     private val aggregationStateType: RowTypeInfo,
     private val generateRetraction: Boolean,
     private val queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -81,7 +81,7 @@ class GroupAggProcessFunction(
 
   override def processElement(
       inputC: CRow,
-      ctx: ProcessFunction[CRow, CRow]#Context,
+      ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
     val currentTime = ctx.timerService().currentProcessingTime()
@@ -169,7 +169,7 @@ class GroupAggProcessFunction(
 
   override def onTimer(
       timestamp: Long,
-      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
       out: Collector[CRow]): Unit = {
 
     if (stateCleaningEnabled) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 6126dc7..b4c50c4 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.state._
@@ -43,13 +43,13 @@ import org.apache.flink.table.util.Logging
   * @param aggregatesTypeInfo       row type info of aggregation
   * @param inputType                row type info of input row
   */
-class ProcTimeBoundedRangeOver(
+class ProcTimeBoundedRangeOver[K](
     genAggregations: GeneratedAggregationsFunction,
     precedingTimeBoundary: Long,
     aggregatesTypeInfo: RowTypeInfo,
     inputType: TypeInformation[CRow],
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -90,7 +90,7 @@ class ProcTimeBoundedRangeOver(
 
   override def processElement(
     input: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val currentTime = ctx.timerService.currentProcessingTime
@@ -114,7 +114,7 @@ class ProcTimeBoundedRangeOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     if (stateCleaningEnabled) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index fa58ac5..b9b1a95 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{MapState, 
MapStateDescriptor, ValueSta
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -40,13 +40,13 @@ import org.apache.flink.util.{Collector, Preconditions}
   * @param aggregatesTypeInfo   row type info of aggregation
   * @param inputType            row type info of input row
   */
-class ProcTimeBoundedRowsOver(
+class ProcTimeBoundedRowsOver[K](
     genAggregations: GeneratedAggregationsFunction,
     precedingOffset: Long,
     aggregatesTypeInfo: RowTypeInfo,
     inputType: TypeInformation[CRow],
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -102,7 +102,7 @@ class ProcTimeBoundedRowsOver(
 
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -184,7 +184,7 @@ class ProcTimeBoundedRowsOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     if (stateCleaningEnabled) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
index 1e12060..1034307 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.types.Row
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.util.{Collector, Preconditions}
@@ -35,10 +35,10 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector
  * @param inputRowType The data type of the input data.
  * @param rowComparator A comparator to sort rows.
  */
-class ProcTimeSortProcessFunction(
+class ProcTimeSortProcessFunction[K](
     private val inputRowType: CRowTypeInfo,
     private val rowComparator: CollectionRowComparator)
-  extends ProcessFunction[CRow, CRow] {
+  extends KeyedProcessFunction[K, CRow, CRow] {
 
   Preconditions.checkNotNull(rowComparator)
 
@@ -58,7 +58,7 @@ class ProcTimeSortProcessFunction(
 
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -74,7 +74,7 @@ class ProcTimeSortProcessFunction(
   
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     // remove timestamp set outside of ProcessFunction.
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
index ce1a959..b781678 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.aggregate
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.table.runtime.types.CRow
@@ -34,11 +34,11 @@ import org.apache.flink.util.Collector
   * @param genAggregations Generated aggregate helper function
   * @param aggregationStateType     row type info of aggregation
   */
-class ProcTimeUnboundedOver(
+class ProcTimeUnboundedOver[K](
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -67,7 +67,7 @@ class ProcTimeUnboundedOver(
 
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     // register state-cleanup timer
@@ -92,7 +92,7 @@ class ProcTimeUnboundedOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     if (stateCleaningEnabled) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
index 7263de7..54f037e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
@@ -22,11 +22,11 @@ import java.lang.{Long => JLong}
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.api.common.state.State
 import org.apache.flink.streaming.api.TimeDomain
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 
-abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig: 
StreamQueryConfig)
-  extends ProcessFunction[IN, OUT]
+abstract class ProcessFunctionWithCleanupState[KEY, IN,OUT](queryConfig: 
StreamQueryConfig)
+  extends KeyedProcessFunction[KEY, IN, OUT]
   with CleanupState {
 
   protected val minRetentionTime: Long = 
queryConfig.getMinIdleStateRetentionTime
@@ -45,7 +45,7 @@ abstract class 
ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryC
   }
 
   protected def processCleanupTimer(
-    ctx: ProcessFunction[IN, OUT]#Context,
+    ctx: KeyedProcessFunction[KEY, IN, OUT]#Context,
     currentTime: Long): Unit = {
     if (stateCleaningEnabled) {
       registerProcessingCleanupTimer(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 7c509d6..c046871 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -40,14 +40,14 @@ import org.apache.flink.util.{Collector, Preconditions}
   * @param inputRowType             row type info of input row
   * @param precedingOffset          preceding offset
  */
-class RowTimeBoundedRangeOver(
+class RowTimeBoundedRangeOver[K](
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
     precedingOffset: Long,
     rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
   Preconditions.checkNotNull(aggregationStateType)
@@ -108,7 +108,7 @@ class RowTimeBoundedRangeOver(
 
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -139,7 +139,7 @@ class RowTimeBoundedRangeOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index d01a499..a331c20 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -41,14 +41,14 @@ import org.apache.flink.util.{Collector, Preconditions}
   * @param inputRowType             row type info of input row
   * @param precedingOffset          preceding offset
  */
-class RowTimeBoundedRowsOver(
+class RowTimeBoundedRowsOver[K](
     genAggregations: GeneratedAggregationsFunction,
     aggregationStateType: RowTypeInfo,
     inputRowType: CRowTypeInfo,
     precedingOffset: Long,
     rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -117,7 +117,7 @@ class RowTimeBoundedRowsOver(
 
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -148,7 +148,7 @@ class RowTimeBoundedRowsOver(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
index f40feb1..0340c54 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.state.{MapState, 
MapStateDescriptor, ValueSta
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.types.Row
@@ -36,11 +36,11 @@ import org.apache.flink.util.{Collector, Preconditions}
   * @param rowtimeIdx The index of the rowtime field.
   * @param rowComparator A comparator to sort rows.
  */
-class RowTimeSortProcessFunction(
+class RowTimeSortProcessFunction[K](
     private val inputRowType: CRowTypeInfo,
     private val rowtimeIdx: Int,
     private val rowComparator: Option[CollectionRowComparator])
-  extends ProcessFunction[CRow, CRow] {
+  extends KeyedProcessFunction[K, CRow, CRow] {
 
   Preconditions.checkNotNull(rowComparator)
 
@@ -77,7 +77,7 @@ class RowTimeSortProcessFunction(
   
   override def processElement(
     inputC: CRow,
-    ctx: ProcessFunction[CRow, CRow]#Context,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
     out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -107,7 +107,7 @@ class RowTimeSortProcessFunction(
 
   override def onTimer(
     timestamp: Long,
-    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+    ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
     out: Collector[CRow]): Unit = {
 
     // remove timestamp set outside of ProcessFunction.
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 690d0d0..2d5d8ab 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ListTypeInfo
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.operators.TimestampedCollector
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -41,13 +41,13 @@ import org.apache.flink.util.Collector
   * @param intermediateType         the intermediate row tye which the state 
saved
   * @param inputType                the input row tye which the state saved
   */
-abstract class RowTimeUnboundedOver(
+abstract class RowTimeUnboundedOver[K](
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
     rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
-  extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+  extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
     with Compiler[GeneratedAggregations]
     with Logging {
 
@@ -102,7 +102,7 @@ abstract class RowTimeUnboundedOver(
     */
   override def processElement(
      inputC: CRow,
-     ctx:  ProcessFunction[CRow, CRow]#Context,
+     ctx:  KeyedProcessFunction[K, CRow, CRow]#Context,
      out: Collector[CRow]): Unit = {
 
     val input = inputC.row
@@ -139,7 +139,7 @@ abstract class RowTimeUnboundedOver(
     */
   override def onTimer(
       timestamp: Long,
-      ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+      ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
       out: Collector[CRow]): Unit = {
 
     if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
@@ -251,13 +251,13 @@ abstract class RowTimeUnboundedOver(
   * A ProcessFunction to support unbounded ROWS window.
   * The ROWS clause defines on a physical level how many rows are included in 
a window frame.
   */
-class RowTimeUnboundedRowsOver(
+class RowTimeUnboundedRowsOver[K](
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
     rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
-  extends RowTimeUnboundedOver(
+  extends RowTimeUnboundedOver[K](
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
     inputType,
@@ -292,13 +292,13 @@ class RowTimeUnboundedRowsOver(
   * The RANGE option includes all the rows within the window frame
   * that have the same ORDER BY values as the current row.
   */
-class RowTimeUnboundedRangeOver(
+class RowTimeUnboundedRangeOver[K](
     genAggregations: GeneratedAggregationsFunction,
     intermediateType: TypeInformation[Row],
     inputType: TypeInformation[CRow],
     rowTimeIdx: Int,
     queryConfig: StreamQueryConfig)
-  extends RowTimeUnboundedOver(
+  extends RowTimeUnboundedOver[K](
     genAggregations: GeneratedAggregationsFunction,
     intermediateType,
     inputType,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index e825b18..0f2e909 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
+import java.lang.{Byte => JByte}
+
 import org.apache.calcite.rel.`type`._
 import org.apache.calcite.rel.RelCollation
 import org.apache.calcite.rel.RelFieldCollation
@@ -25,7 +27,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.java.typeutils.runtime.RowComparator
 import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -59,7 +61,7 @@ object SortUtil {
     collationSort: RelCollation,
     inputType: RelDataType,
     inputTypeInfo: TypeInformation[Row],
-    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+    execCfg: ExecutionConfig): KeyedProcessFunction[JByte, CRow, CRow] = {
 
     Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
     val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex
@@ -78,7 +80,7 @@ object SortUtil {
 
     val inputCRowType = CRowTypeInfo(inputTypeInfo)
  
-    new RowTimeSortProcessFunction(
+    new RowTimeSortProcessFunction[JByte](
       inputCRowType,
       rowtimeIdx,
       collectionRowComparator)
@@ -97,7 +99,7 @@ object SortUtil {
     collationSort: RelCollation,
     inputType: RelDataType,
     inputTypeInfo: TypeInformation[Row],
-    execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+    execCfg: ExecutionConfig): KeyedProcessFunction[JByte, CRow, CRow] = {
 
     val rowComp = createRowComparator(
       inputType,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
index 0550213..b776e67 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
@@ -71,7 +71,7 @@ class AggFunctionHarnessTest extends HarnessTestBase {
     val state = getState(
       operator,
       "function",
-      classOf[GroupAggProcessFunction],
+      classOf[GroupAggProcessFunction[Row]],
       "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
     assertTrue(state.isInstanceOf[StateMapView[_, _]])
     
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
@@ -136,12 +136,12 @@ class AggFunctionHarnessTest extends HarnessTestBase {
     val minState = getState(
       operator,
       "function",
-      classOf[GroupAggProcessFunction],
+      classOf[GroupAggProcessFunction[Row]],
       "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
     val maxState = getState(
       operator,
       "function",
-      classOf[GroupAggProcessFunction],
+      classOf[GroupAggProcessFunction[Row]],
       "acc1_map_dataview").asInstanceOf[MapView[JInt, JInt]]
     assertTrue(minState.isInstanceOf[StateMapView[_, _]])
     assertTrue(maxState.isInstanceOf[StateMapView[_, _]])
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
index fe00e11..86458b3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.scala._
@@ -45,8 +45,8 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testAggregate(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new GroupAggProcessFunction(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new GroupAggProcessFunction[String](
         genSumAggFunction,
         sumAggregationStateType,
         false,
@@ -104,8 +104,8 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
   @Test
   def testAggregateWithRetract(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new GroupAggProcessFunction(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new GroupAggProcessFunction[String](
         genSumAggFunction,
         sumAggregationStateType,
         true,
@@ -211,7 +211,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
     val fields = getGeneratedAggregationFields(
       operator,
       "function",
-      classOf[GroupAggProcessFunction])
+      classOf[GroupAggProcessFunction[Row]])
 
     // check only one DistinctAccumulator is used
     assertTrue(fields.count(_.getName.endsWith("distinctValueMap_dataview")) 
== 1)
@@ -301,7 +301,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
     val fields = getGeneratedAggregationFields(
       operator,
       "function",
-      classOf[GroupAggProcessFunction])
+      classOf[GroupAggProcessFunction[Row]])
 
     // check only one DistinctAccumulator is used
     assertTrue(fields.count(_.getName.endsWith("distinctValueMap_dataview")) 
== 1)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 95b13a0..68dce4c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
 
 import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
 import org.apache.flink.table.runtime.aggregate._
@@ -40,8 +40,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeBoundedRowsOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRowsOver[String](
         genMinMaxAggFunction,
         2,
         minMaxAggregationStateType,
@@ -141,8 +141,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeBoundedRangeOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeBoundedRangeOver[String](
         genMinMaxAggFunction,
         4000,
         minMaxAggregationStateType,
@@ -269,8 +269,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testProcTimeUnboundedOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new ProcTimeUnboundedOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new ProcTimeUnboundedOver[String](
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         queryConfig))
@@ -361,8 +361,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRangeOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new RowTimeBoundedRangeOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRangeOver[String](
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
@@ -511,8 +511,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeBoundedRowsOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new RowTimeBoundedRowsOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeBoundedRowsOver[String](
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
@@ -659,8 +659,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRangeOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new RowTimeUnboundedRangeOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRangeOver[String](
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
@@ -795,8 +795,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
   @Test
   def testRowTimeUnboundedRowsOver(): Unit = {
 
-    val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
-      new RowTimeUnboundedRowsOver(
+    val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+      new RowTimeUnboundedRowsOver[String](
         genMinMaxAggFunction,
         minMaxAggregationStateType,
         minMaxCRowType,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 457bde2..96d7dd3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.typeutils.runtime.RowComparator
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
TestHarnessUtil}
@@ -71,8 +71,8 @@ class SortProcessFunctionHarnessTest {
     
     val inputCRowType = CRowTypeInfo(rT)
     
-    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
-      new ProcTimeSortProcessFunction(
+    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+      new ProcTimeSortProcessFunction[Integer](
         inputCRowType,
         collectionRowComparator))
   
@@ -170,8 +170,8 @@ class SortProcessFunctionHarnessTest {
 
     val inputCRowType = CRowTypeInfo(rT)
 
-    val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
-      new RowTimeSortProcessFunction(
+    val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+      new RowTimeSortProcessFunction[Integer](
         inputCRowType,
         4,
         Some(collectionRowComparator)))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
index 6c0ca1a..270a084 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
 import org.apache.flink.table.api.StreamQueryConfig
 import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
 import org.apache.flink.table.runtime.harness.HarnessTestBase
@@ -39,8 +39,8 @@ class ProcessFunctionWithCleanupStateTest extends 
HarnessTestBase {
   def testStateCleaning(): Unit = {
     val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5), 
Time.milliseconds(10))
 
-    val func = new MockedProcessFunction(queryConfig)
-    val operator = new LegacyKeyedProcessOperator(func)
+    val func = new MockedProcessFunction[String](queryConfig)
+    val operator = new KeyedProcessOperator(func)
 
     val testHarness = createHarnessTester(operator,
       new FirstFieldSelector,
@@ -93,8 +93,8 @@ class ProcessFunctionWithCleanupStateTest extends 
HarnessTestBase {
   }
 }
 
-private class MockedProcessFunction(queryConfig: StreamQueryConfig)
-    extends ProcessFunctionWithCleanupState[(String, String), 
String](queryConfig) {
+private class MockedProcessFunction[K](queryConfig: StreamQueryConfig)
+    extends ProcessFunctionWithCleanupState[K, (String, String), 
String](queryConfig) {
 
   var state: ValueState[String] = _
 
@@ -106,7 +106,7 @@ private class MockedProcessFunction(queryConfig: 
StreamQueryConfig)
 
   override def processElement(
       value: (String, String),
-      ctx: ProcessFunction[(String, String), String]#Context,
+      ctx: KeyedProcessFunction[K, (String, String), String]#Context,
       out: Collector[String]): Unit = {
 
     val curTime = ctx.timerService().currentProcessingTime()
@@ -116,7 +116,7 @@ private class MockedProcessFunction(queryConfig: 
StreamQueryConfig)
 
   override def onTimer(
       timestamp: Long,
-      ctx: ProcessFunction[(String, String), String]#OnTimerContext,
+      ctx: KeyedProcessFunction[K, (String, String), String]#OnTimerContext,
       out: Collector[String]): Unit = {
 
     if (stateCleaningEnabled) {

Reply via email to