This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d1d0bd1 [FLINK-11091][table] Clear the use of deprecated
`process(ProcessFunction)` of KeyedStream in table operators.
d1d0bd1 is described below
commit d1d0bd1b85fc501c67498acaf8dc528b15aa1686
Author: sunjincheng121 <[email protected]>
AuthorDate: Tue Feb 26 17:32:20 2019 +0800
[FLINK-11091][table] Clear the use of deprecated `process(ProcessFunction)`
of KeyedStream in table operators.
This closes #7837
---
.../flink/table/api/BatchTableEnvironment.scala | 1 -
.../datastream/DataStreamGroupAggregate.scala | 30 ++++++----
.../nodes/datastream/DataStreamOverAggregate.scala | 66 ++++++++++++----------
.../plan/nodes/datastream/DataStreamSort.scala | 13 +++--
.../table/runtime/aggregate/AggregateUtil.scala | 30 +++++-----
.../aggregate/GroupAggProcessFunction.scala | 10 ++--
.../aggregate/ProcTimeBoundedRangeOver.scala | 10 ++--
.../aggregate/ProcTimeBoundedRowsOver.scala | 10 ++--
.../aggregate/ProcTimeSortProcessFunction.scala | 10 ++--
.../runtime/aggregate/ProcTimeUnboundedOver.scala | 10 ++--
.../ProcessFunctionWithCleanupState.scala | 8 +--
.../aggregate/RowTimeBoundedRangeOver.scala | 10 ++--
.../runtime/aggregate/RowTimeBoundedRowsOver.scala | 10 ++--
.../aggregate/RowTimeSortProcessFunction.scala | 10 ++--
.../runtime/aggregate/RowTimeUnboundedOver.scala | 18 +++---
.../flink/table/runtime/aggregate/SortUtil.scala | 10 ++--
.../runtime/harness/AggFunctionHarnessTest.scala | 6 +-
.../harness/GroupAggregateHarnessTest.scala | 14 ++---
.../runtime/harness/OverWindowHarnessTest.scala | 30 +++++-----
.../harness/SortProcessFunctionHarnessTest.scala | 10 ++--
.../ProcessFunctionWithCleanupStateTest.scala | 16 +++---
21 files changed, 174 insertions(+), 158 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 99e9d7e..1aa203b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table.api
import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 5f4b186c..d48b682 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -17,11 +17,14 @@
*/
package org.apache.flink.table.plan.nodes.datastream
+import java.lang.{Byte => JByte}
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -32,6 +35,7 @@ import
org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
/**
*
@@ -131,23 +135,25 @@ class DataStreamGroupAggregate(
s"select: ($aggString)"
val nonKeyedAggOpName = s"select: ($aggString)"
- val processFunction = AggregateUtil.createGroupAggregateFunction(
- generator,
- namedAggregates,
- inputSchema.relDataType,
- inputSchema.fieldTypeInfos,
- groupings,
- queryConfig,
- tableEnv.getConfig,
- DataStreamRetractionRules.isAccRetract(this),
- DataStreamRetractionRules.isAccRetract(getInput))
+ def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+ AggregateUtil.createGroupAggregateFunction[K](
+ generator,
+ namedAggregates,
+ inputSchema.relDataType,
+ inputSchema.fieldTypeInfos,
+ groupings,
+ queryConfig,
+ tableEnv.getConfig,
+ DataStreamRetractionRules.isAccRetract(this),
+ DataStreamRetractionRules.isAccRetract(getInput))
+ }
val result: DataStream[CRow] =
// grouped / keyed aggregation
if (groupings.nonEmpty) {
inputDS
.keyBy(new CRowKeySelector(groupings,
inputSchema.projectedTypeInfo(groupings)))
- .process(processFunction)
+ .process(createKeyedProcessFunction[Row])
.returns(outRowType)
.name(keyedAggOpName)
.asInstanceOf[DataStream[CRow]]
@@ -156,7 +162,7 @@ class DataStreamGroupAggregate(
else {
inputDS
.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction)
+ .process(createKeyedProcessFunction[JByte])
.setParallelism(1)
.setMaxParallelism(1)
.returns(outRowType)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index c1693d9..a22b4cc 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.plan.nodes.datastream
+import java.lang.{Byte => JByte}
import java.util.{List => JList}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -28,6 +29,7 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment,
TableConfig, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.AggregationCodeGenerator
@@ -39,6 +41,7 @@ import
org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -214,25 +217,27 @@ class DataStreamOverAggregate(
// get the output types
val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
- val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
- generator,
- namedAggregates,
- aggregateInputType,
- inputSchema.relDataType,
- inputSchema.typeInfo,
- inputSchema.fieldTypeInfos,
- queryConfig,
- tableConfig,
- rowTimeIdx,
- partitionKeys.nonEmpty,
- isRowsClause)
+ def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+ AggregateUtil.createUnboundedOverProcessFunction[K](
+ generator,
+ namedAggregates,
+ aggregateInputType,
+ inputSchema.relDataType,
+ inputSchema.typeInfo,
+ inputSchema.fieldTypeInfos,
+ queryConfig,
+ tableConfig,
+ rowTimeIdx,
+ partitionKeys.nonEmpty,
+ isRowsClause)
+ }
val result: DataStream[CRow] =
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
.keyBy(new CRowKeySelector(partitionKeys,
inputSchema.projectedTypeInfo(partitionKeys)))
- .process(processFunction)
+ .process(createKeyedProcessFunction[Row])
.returns(returnTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[CRow]]
@@ -240,7 +245,7 @@ class DataStreamOverAggregate(
// non-partitioned aggregation
else {
inputDS.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
+
.process(createKeyedProcessFunction[JByte]).setParallelism(1).setMaxParallelism(1)
.returns(returnTypeInfo)
.name(aggOpName)
}
@@ -268,25 +273,28 @@ class DataStreamOverAggregate(
// get the output types
val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
- val processFunction = AggregateUtil.createBoundedOverProcessFunction(
- generator,
- namedAggregates,
- aggregateInputType,
- inputSchema.relDataType,
- inputSchema.typeInfo,
- inputSchema.fieldTypeInfos,
- precedingOffset,
- queryConfig,
- tableConfig,
- isRowsClause,
- rowTimeIdx
- )
+ def createKeyedProcessFunction[K]: KeyedProcessFunction[K, CRow, CRow] = {
+ AggregateUtil.createBoundedOverProcessFunction[K](
+ generator,
+ namedAggregates,
+ aggregateInputType,
+ inputSchema.relDataType,
+ inputSchema.typeInfo,
+ inputSchema.fieldTypeInfos,
+ precedingOffset,
+ queryConfig,
+ tableConfig,
+ isRowsClause,
+ rowTimeIdx
+ )
+ }
+
val result: DataStream[CRow] =
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
.keyBy(new CRowKeySelector(partitionKeys,
inputSchema.projectedTypeInfo(partitionKeys)))
- .process(processFunction)
+ .process(createKeyedProcessFunction[Row])
.returns(returnTypeInfo)
.name(aggOpName)
}
@@ -294,7 +302,7 @@ class DataStreamOverAggregate(
else {
inputDS
.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
+
.process(createKeyedProcessFunction[JByte]).setParallelism(1).setMaxParallelism(1)
.returns(returnTypeInfo)
.name(aggOpName)
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
index 8f9942f..936bbb2 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSort.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{ RelNode, RelWriter }
+import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.runtime.aggregate._
@@ -34,6 +34,7 @@ import org.apache.flink.table.api.{StreamQueryConfig,
StreamTableEnvironment, Ta
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.nodes.CommonSort
import org.apache.calcite.rel.core.Sort
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
/**
* Flink RelNode which matches along with Sort Rule.
@@ -146,14 +147,14 @@ class DataStreamSort(
// if the order has secondary sorting fields in addition to the proctime
if (sortCollation.getFieldCollations.size() > 1) {
- val processFunction = SortUtil.createProcTimeSortFunction(
+ val KeyedProcessFunction = SortUtil.createProcTimeSortFunction(
sortCollation,
inputSchema.relDataType,
inputSchema.typeInfo,
execCfg)
inputDS.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .process(KeyedProcessFunction).setParallelism(1).setMaxParallelism(1)
.returns(returnTypeInfo)
.asInstanceOf[DataStream[CRow]]
} else {
@@ -175,14 +176,14 @@ class DataStreamSort(
val returnTypeInfo = CRowTypeInfo(schema.typeInfo)
- val processFunction = SortUtil.createRowTimeSortFunction(
+ val keyedProcessFunction = SortUtil.createRowTimeSortFunction(
sortCollation,
inputSchema.relDataType,
inputSchema.typeInfo,
execCfg)
-
+
inputDS.keyBy(new NullByteKeySelector[CRow])
- .process(processFunction).setParallelism(1).setMaxParallelism(1)
+ .process(keyedProcessFunction).setParallelism(1).setMaxParallelism(1)
.returns(returnTypeInfo)
.asInstanceOf[DataStream[CRow]]
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 7c38254..f87ed3c 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.functions.{MapFunction,
RichGroupReduceFuncti
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.typeutils.Types
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction,
WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window =>
DataStreamWindow}
import org.apache.flink.table.api.dataview.DataViewSpec
@@ -72,7 +72,7 @@ object AggregateUtil {
* @param isPartitioned It is a tag that indicate whether the input is
partitioned
* @param isRowsClause It is a tag that indicates whether the OVER clause
is ROWS clause
*/
- private[flink] def createUnboundedOverProcessFunction(
+ private[flink] def createUnboundedOverProcessFunction[K](
generator: AggregationCodeGenerator,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
aggregateInputType: RelDataType,
@@ -84,7 +84,7 @@ object AggregateUtil {
rowTimeIdx: Option[Int],
isPartitioned: Boolean,
isRowsClause: Boolean)
- : ProcessFunction[CRow, CRow] = {
+ : KeyedProcessFunction[K, CRow, CRow] = {
val aggregateMetadata = extractAggregateMetadata(
namedAggregates.map(_.getKey),
@@ -123,7 +123,7 @@ object AggregateUtil {
if (rowTimeIdx.isDefined) {
if (isRowsClause) {
// ROWS unbounded over process function
- new RowTimeUnboundedRowsOver(
+ new RowTimeUnboundedRowsOver[K](
genFunction,
aggregationStateType,
CRowTypeInfo(inputTypeInfo),
@@ -131,7 +131,7 @@ object AggregateUtil {
queryConfig)
} else {
// RANGE unbounded over process function
- new RowTimeUnboundedRangeOver(
+ new RowTimeUnboundedRangeOver[K](
genFunction,
aggregationStateType,
CRowTypeInfo(inputTypeInfo),
@@ -139,7 +139,7 @@ object AggregateUtil {
queryConfig)
}
} else {
- new ProcTimeUnboundedOver(
+ new ProcTimeUnboundedOver[K](
genFunction,
aggregationStateType,
queryConfig)
@@ -160,7 +160,7 @@ object AggregateUtil {
* @param consumeRetraction It is a tag that indicates whether consume the
retract record.
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
- private[flink] def createGroupAggregateFunction(
+ private[flink] def createGroupAggregateFunction[K](
generator: AggregationCodeGenerator,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputRowType: RelDataType,
@@ -169,7 +169,7 @@ object AggregateUtil {
queryConfig: StreamQueryConfig,
tableConfig: TableConfig,
generateRetraction: Boolean,
- consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
+ consumeRetraction: Boolean): KeyedProcessFunction[K, CRow, CRow] = {
val aggregateMetadata = extractAggregateMetadata(
namedAggregates.map(_.getKey),
@@ -202,7 +202,7 @@ object AggregateUtil {
val aggregationStateType: RowTypeInfo = new RowTypeInfo(aggregateMetadata
.getAggregatesAccumulatorTypes: _*)
- new GroupAggProcessFunction(
+ new GroupAggProcessFunction[K](
genFunction,
aggregationStateType,
generateRetraction,
@@ -225,7 +225,7 @@ object AggregateUtil {
* @param rowTimeIdx The index of the rowtime field or None in case of
processing time.
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
- private[flink] def createBoundedOverProcessFunction(
+ private[flink] def createBoundedOverProcessFunction[K](
generator: AggregationCodeGenerator,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
aggregateInputType: RelDataType,
@@ -237,7 +237,7 @@ object AggregateUtil {
tableConfig: TableConfig,
isRowsClause: Boolean,
rowTimeIdx: Option[Int])
- : ProcessFunction[CRow, CRow] = {
+ : KeyedProcessFunction[K, CRow, CRow] = {
val needRetract = true
val aggregateMetadata = extractAggregateMetadata(
@@ -277,7 +277,7 @@ object AggregateUtil {
.getAggregatesAccumulatorTypes: _*)
if (rowTimeIdx.isDefined) {
if (isRowsClause) {
- new RowTimeBoundedRowsOver(
+ new RowTimeBoundedRowsOver[K](
genFunction,
aggregationStateType,
inputRowType,
@@ -285,7 +285,7 @@ object AggregateUtil {
rowTimeIdx.get,
queryConfig)
} else {
- new RowTimeBoundedRangeOver(
+ new RowTimeBoundedRangeOver[K](
genFunction,
aggregationStateType,
inputRowType,
@@ -295,14 +295,14 @@ object AggregateUtil {
}
} else {
if (isRowsClause) {
- new ProcTimeBoundedRowsOver(
+ new ProcTimeBoundedRowsOver[K](
genFunction,
precedingOffset,
aggregationStateType,
inputRowType,
queryConfig)
} else {
- new ProcTimeBoundedRangeOver(
+ new ProcTimeBoundedRangeOver[K](
genFunction,
precedingOffset,
aggregationStateType,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 2d72e6d..5bbec22 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -22,7 +22,7 @@ import java.lang.{Long => JLong}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, Types}
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.CRow
@@ -36,12 +36,12 @@ import org.apache.flink.util.Collector
* @param genAggregations Generated aggregate helper function
* @param aggregationStateType The row type info of aggregation
*/
-class GroupAggProcessFunction(
+class GroupAggProcessFunction[K](
private val genAggregations: GeneratedAggregationsFunction,
private val aggregationStateType: RowTypeInfo,
private val generateRetraction: Boolean,
private val queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -81,7 +81,7 @@ class GroupAggProcessFunction(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val currentTime = ctx.timerService().currentProcessingTime()
@@ -169,7 +169,7 @@ class GroupAggProcessFunction(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (stateCleaningEnabled) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 6126dc7..b4c50c4 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
import org.apache.flink.api.common.state._
@@ -43,13 +43,13 @@ import org.apache.flink.table.util.Logging
* @param aggregatesTypeInfo row type info of aggregation
* @param inputType row type info of input row
*/
-class ProcTimeBoundedRangeOver(
+class ProcTimeBoundedRangeOver[K](
genAggregations: GeneratedAggregationsFunction,
precedingTimeBoundary: Long,
aggregatesTypeInfo: RowTypeInfo,
inputType: TypeInformation[CRow],
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -90,7 +90,7 @@ class ProcTimeBoundedRangeOver(
override def processElement(
input: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val currentTime = ctx.timerService.currentProcessingTime
@@ -114,7 +114,7 @@ class ProcTimeBoundedRangeOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (stateCleaningEnabled) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
index fa58ac5..b9b1a95 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{MapState,
MapStateDescriptor, ValueSta
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -40,13 +40,13 @@ import org.apache.flink.util.{Collector, Preconditions}
* @param aggregatesTypeInfo row type info of aggregation
* @param inputType row type info of input row
*/
-class ProcTimeBoundedRowsOver(
+class ProcTimeBoundedRowsOver[K](
genAggregations: GeneratedAggregationsFunction,
precedingOffset: Long,
aggregatesTypeInfo: RowTypeInfo,
inputType: TypeInformation[CRow],
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -102,7 +102,7 @@ class ProcTimeBoundedRowsOver(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -184,7 +184,7 @@ class ProcTimeBoundedRowsOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (stateCleaningEnabled) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
index 1e12060..1034307 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeSortProcessFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.types.Row
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.util.{Collector, Preconditions}
@@ -35,10 +35,10 @@ import
org.apache.flink.streaming.api.operators.TimestampedCollector
* @param inputRowType The data type of the input data.
* @param rowComparator A comparator to sort rows.
*/
-class ProcTimeSortProcessFunction(
+class ProcTimeSortProcessFunction[K](
private val inputRowType: CRowTypeInfo,
private val rowComparator: CollectionRowComparator)
- extends ProcessFunction[CRow, CRow] {
+ extends KeyedProcessFunction[K, CRow, CRow] {
Preconditions.checkNotNull(rowComparator)
@@ -58,7 +58,7 @@ class ProcTimeSortProcessFunction(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -74,7 +74,7 @@ class ProcTimeSortProcessFunction(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
// remove timestamp set outside of ProcessFunction.
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
index ce1a959..b781678 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.runtime.aggregate
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
import org.apache.flink.table.runtime.types.CRow
@@ -34,11 +34,11 @@ import org.apache.flink.util.Collector
* @param genAggregations Generated aggregate helper function
* @param aggregationStateType row type info of aggregation
*/
-class ProcTimeUnboundedOver(
+class ProcTimeUnboundedOver[K](
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -67,7 +67,7 @@ class ProcTimeUnboundedOver(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
// register state-cleanup timer
@@ -92,7 +92,7 @@ class ProcTimeUnboundedOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (stateCleaningEnabled) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
index 7263de7..54f037e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala
@@ -22,11 +22,11 @@ import java.lang.{Long => JLong}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.state.State
import org.apache.flink.streaming.api.TimeDomain
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.table.api.{StreamQueryConfig, Types}
-abstract class ProcessFunctionWithCleanupState[IN,OUT](queryConfig:
StreamQueryConfig)
- extends ProcessFunction[IN, OUT]
+abstract class ProcessFunctionWithCleanupState[KEY, IN,OUT](queryConfig:
StreamQueryConfig)
+ extends KeyedProcessFunction[KEY, IN, OUT]
with CleanupState {
protected val minRetentionTime: Long =
queryConfig.getMinIdleStateRetentionTime
@@ -45,7 +45,7 @@ abstract class
ProcessFunctionWithCleanupState[IN,OUT](queryConfig: StreamQueryC
}
protected def processCleanupTimer(
- ctx: ProcessFunction[IN, OUT]#Context,
+ ctx: KeyedProcessFunction[KEY, IN, OUT]#Context,
currentTime: Long): Unit = {
if (stateCleaningEnabled) {
registerProcessingCleanupTimer(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 7c509d6..c046871 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -40,14 +40,14 @@ import org.apache.flink.util.{Collector, Preconditions}
* @param inputRowType row type info of input row
* @param precedingOffset preceding offset
*/
-class RowTimeBoundedRangeOver(
+class RowTimeBoundedRangeOver[K](
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
inputRowType: CRowTypeInfo,
precedingOffset: Long,
rowTimeIdx: Int,
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
Preconditions.checkNotNull(aggregationStateType)
@@ -108,7 +108,7 @@ class RowTimeBoundedRangeOver(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -139,7 +139,7 @@ class RowTimeBoundedRangeOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index d01a499..a331c20 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -41,14 +41,14 @@ import org.apache.flink.util.{Collector, Preconditions}
* @param inputRowType row type info of input row
* @param precedingOffset preceding offset
*/
-class RowTimeBoundedRowsOver(
+class RowTimeBoundedRowsOver[K](
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
inputRowType: CRowTypeInfo,
precedingOffset: Long,
rowTimeIdx: Int,
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -117,7 +117,7 @@ class RowTimeBoundedRowsOver(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -148,7 +148,7 @@ class RowTimeBoundedRowsOver(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
index f40feb1..0340c54 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.state.{MapState,
MapStateDescriptor, ValueSta
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.types.Row
@@ -36,11 +36,11 @@ import org.apache.flink.util.{Collector, Preconditions}
* @param rowtimeIdx The index of the rowtime field.
* @param rowComparator A comparator to sort rows.
*/
-class RowTimeSortProcessFunction(
+class RowTimeSortProcessFunction[K](
private val inputRowType: CRowTypeInfo,
private val rowtimeIdx: Int,
private val rowComparator: Option[CollectionRowComparator])
- extends ProcessFunction[CRow, CRow] {
+ extends KeyedProcessFunction[K, CRow, CRow] {
Preconditions.checkNotNull(rowComparator)
@@ -77,7 +77,7 @@ class RowTimeSortProcessFunction(
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -107,7 +107,7 @@ class RowTimeSortProcessFunction(
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
// remove timestamp set outside of ProcessFunction.
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index 690d0d0..2d5d8ab 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.TimestampedCollector
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
@@ -41,13 +41,13 @@ import org.apache.flink.util.Collector
* @param intermediateType the intermediate row tye which the state
saved
* @param inputType the input row tye which the state saved
*/
-abstract class RowTimeUnboundedOver(
+abstract class RowTimeUnboundedOver[K](
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[CRow],
rowTimeIdx: Int,
queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig)
+ extends ProcessFunctionWithCleanupState[K, CRow, CRow](queryConfig)
with Compiler[GeneratedAggregations]
with Logging {
@@ -102,7 +102,7 @@ abstract class RowTimeUnboundedOver(
*/
override def processElement(
inputC: CRow,
- ctx: ProcessFunction[CRow, CRow]#Context,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#Context,
out: Collector[CRow]): Unit = {
val input = inputC.row
@@ -139,7 +139,7 @@ abstract class RowTimeUnboundedOver(
*/
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, CRow, CRow]#OnTimerContext,
out: Collector[CRow]): Unit = {
if (isProcessingTimeTimer(ctx.asInstanceOf[OnTimerContext])) {
@@ -251,13 +251,13 @@ abstract class RowTimeUnboundedOver(
* A ProcessFunction to support unbounded ROWS window.
* The ROWS clause defines on a physical level how many rows are included in
a window frame.
*/
-class RowTimeUnboundedRowsOver(
+class RowTimeUnboundedRowsOver[K](
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[CRow],
rowTimeIdx: Int,
queryConfig: StreamQueryConfig)
- extends RowTimeUnboundedOver(
+ extends RowTimeUnboundedOver[K](
genAggregations: GeneratedAggregationsFunction,
intermediateType,
inputType,
@@ -292,13 +292,13 @@ class RowTimeUnboundedRowsOver(
* The RANGE option includes all the rows within the window frame
* that have the same ORDER BY values as the current row.
*/
-class RowTimeUnboundedRangeOver(
+class RowTimeUnboundedRangeOver[K](
genAggregations: GeneratedAggregationsFunction,
intermediateType: TypeInformation[Row],
inputType: TypeInformation[CRow],
rowTimeIdx: Int,
queryConfig: StreamQueryConfig)
- extends RowTimeUnboundedOver(
+ extends RowTimeUnboundedOver[K](
genAggregations: GeneratedAggregationsFunction,
intermediateType,
inputType,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
index e825b18..0f2e909 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/aggregate/SortUtil.scala
@@ -17,6 +17,8 @@
*/
package org.apache.flink.table.runtime.aggregate
+import java.lang.{Byte => JByte}
+
import org.apache.calcite.rel.`type`._
import org.apache.calcite.rel.RelCollation
import org.apache.calcite.rel.RelFieldCollation
@@ -25,7 +27,7 @@ import org.apache.flink.types.Row
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.api.common.typeutils.TypeComparator
import org.apache.flink.api.java.typeutils.runtime.RowComparator
import org.apache.flink.api.common.typeutils.TypeSerializer
@@ -59,7 +61,7 @@ object SortUtil {
collationSort: RelCollation,
inputType: RelDataType,
inputTypeInfo: TypeInformation[Row],
- execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+ execCfg: ExecutionConfig): KeyedProcessFunction[JByte, CRow, CRow] = {
Preconditions.checkArgument(collationSort.getFieldCollations.size() > 0)
val rowtimeIdx = collationSort.getFieldCollations.get(0).getFieldIndex
@@ -78,7 +80,7 @@ object SortUtil {
val inputCRowType = CRowTypeInfo(inputTypeInfo)
- new RowTimeSortProcessFunction(
+ new RowTimeSortProcessFunction[JByte](
inputCRowType,
rowtimeIdx,
collectionRowComparator)
@@ -97,7 +99,7 @@ object SortUtil {
collationSort: RelCollation,
inputType: RelDataType,
inputTypeInfo: TypeInformation[Row],
- execCfg: ExecutionConfig): ProcessFunction[CRow, CRow] = {
+ execCfg: ExecutionConfig): KeyedProcessFunction[JByte, CRow, CRow] = {
val rowComp = createRowComparator(
inputType,
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
index 0550213..b776e67 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
@@ -71,7 +71,7 @@ class AggFunctionHarnessTest extends HarnessTestBase {
val state = getState(
operator,
"function",
- classOf[GroupAggProcessFunction],
+ classOf[GroupAggProcessFunction[Row]],
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
assertTrue(state.isInstanceOf[StateMapView[_, _]])
assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
@@ -136,12 +136,12 @@ class AggFunctionHarnessTest extends HarnessTestBase {
val minState = getState(
operator,
"function",
- classOf[GroupAggProcessFunction],
+ classOf[GroupAggProcessFunction[Row]],
"acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
val maxState = getState(
operator,
"function",
- classOf[GroupAggProcessFunction],
+ classOf[GroupAggProcessFunction[Row]],
"acc1_map_dataview").asInstanceOf[MapView[JInt, JInt]]
assertTrue(minState.isInstanceOf[StateMapView[_, _]])
assertTrue(maxState.isInstanceOf[StateMapView[_, _]])
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
index fe00e11..86458b3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/GroupAggregateHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.table.api.scala._
@@ -45,8 +45,8 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
@Test
def testAggregate(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new GroupAggProcessFunction(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new GroupAggProcessFunction[String](
genSumAggFunction,
sumAggregationStateType,
false,
@@ -104,8 +104,8 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
@Test
def testAggregateWithRetract(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new GroupAggProcessFunction(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new GroupAggProcessFunction[String](
genSumAggFunction,
sumAggregationStateType,
true,
@@ -211,7 +211,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
val fields = getGeneratedAggregationFields(
operator,
"function",
- classOf[GroupAggProcessFunction])
+ classOf[GroupAggProcessFunction[Row]])
// check only one DistinctAccumulator is used
assertTrue(fields.count(_.getName.endsWith("distinctValueMap_dataview"))
== 1)
@@ -301,7 +301,7 @@ class GroupAggregateHarnessTest extends HarnessTestBase {
val fields = getGeneratedAggregationFields(
operator,
"function",
- classOf[GroupAggProcessFunction])
+ classOf[GroupAggProcessFunction[Row]])
// check only one DistinctAccumulator is used
assertTrue(fields.count(_.getName.endsWith("distinctValueMap_dataview"))
== 1)
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 95b13a0..68dce4c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.table.api.{StreamQueryConfig, Types}
import org.apache.flink.table.runtime.aggregate._
@@ -40,8 +40,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testProcTimeBoundedRowsOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new ProcTimeBoundedRowsOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeBoundedRowsOver[String](
genMinMaxAggFunction,
2,
minMaxAggregationStateType,
@@ -141,8 +141,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testProcTimeBoundedRangeOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new ProcTimeBoundedRangeOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeBoundedRangeOver[String](
genMinMaxAggFunction,
4000,
minMaxAggregationStateType,
@@ -269,8 +269,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testProcTimeUnboundedOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new ProcTimeUnboundedOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeUnboundedOver[String](
genMinMaxAggFunction,
minMaxAggregationStateType,
queryConfig))
@@ -361,8 +361,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testRowTimeBoundedRangeOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new RowTimeBoundedRangeOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeBoundedRangeOver[String](
genMinMaxAggFunction,
minMaxAggregationStateType,
minMaxCRowType,
@@ -511,8 +511,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testRowTimeBoundedRowsOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new RowTimeBoundedRowsOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeBoundedRowsOver[String](
genMinMaxAggFunction,
minMaxAggregationStateType,
minMaxCRowType,
@@ -659,8 +659,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testRowTimeUnboundedRangeOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new RowTimeUnboundedRangeOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeUnboundedRangeOver[String](
genMinMaxAggFunction,
minMaxAggregationStateType,
minMaxCRowType,
@@ -795,8 +795,8 @@ class OverWindowHarnessTest extends HarnessTestBase{
@Test
def testRowTimeUnboundedRowsOver(): Unit = {
- val processFunction = new LegacyKeyedProcessOperator[String, CRow, CRow](
- new RowTimeUnboundedRowsOver(
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeUnboundedRowsOver[String](
genMinMaxAggFunction,
minMaxAggregationStateType,
minMaxCRowType,
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
index 457bde2..96d7dd3 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.java.typeutils.runtime.RowComparator
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness,
TestHarnessUtil}
@@ -71,8 +71,8 @@ class SortProcessFunctionHarnessTest {
val inputCRowType = CRowTypeInfo(rT)
- val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
- new ProcTimeSortProcessFunction(
+ val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+ new ProcTimeSortProcessFunction[Integer](
inputCRowType,
collectionRowComparator))
@@ -170,8 +170,8 @@ class SortProcessFunctionHarnessTest {
val inputCRowType = CRowTypeInfo(rT)
- val processFunction = new LegacyKeyedProcessOperator[Integer,CRow,CRow](
- new RowTimeSortProcessFunction(
+ val processFunction = new KeyedProcessOperator[Integer,CRow,CRow](
+ new RowTimeSortProcessFunction[Integer](
inputCRowType,
4,
Some(collectionRowComparator)))
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
index 6c0ca1a..270a084 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/operators/ProcessFunctionWithCleanupStateTest.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.table.api.StreamQueryConfig
import org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
import org.apache.flink.table.runtime.harness.HarnessTestBase
@@ -39,8 +39,8 @@ class ProcessFunctionWithCleanupStateTest extends
HarnessTestBase {
def testStateCleaning(): Unit = {
val queryConfig = new TestStreamQueryConfig(Time.milliseconds(5),
Time.milliseconds(10))
- val func = new MockedProcessFunction(queryConfig)
- val operator = new LegacyKeyedProcessOperator(func)
+ val func = new MockedProcessFunction[String](queryConfig)
+ val operator = new KeyedProcessOperator(func)
val testHarness = createHarnessTester(operator,
new FirstFieldSelector,
@@ -93,8 +93,8 @@ class ProcessFunctionWithCleanupStateTest extends
HarnessTestBase {
}
}
-private class MockedProcessFunction(queryConfig: StreamQueryConfig)
- extends ProcessFunctionWithCleanupState[(String, String),
String](queryConfig) {
+private class MockedProcessFunction[K](queryConfig: StreamQueryConfig)
+ extends ProcessFunctionWithCleanupState[K, (String, String),
String](queryConfig) {
var state: ValueState[String] = _
@@ -106,7 +106,7 @@ private class MockedProcessFunction(queryConfig:
StreamQueryConfig)
override def processElement(
value: (String, String),
- ctx: ProcessFunction[(String, String), String]#Context,
+ ctx: KeyedProcessFunction[K, (String, String), String]#Context,
out: Collector[String]): Unit = {
val curTime = ctx.timerService().currentProcessingTime()
@@ -116,7 +116,7 @@ private class MockedProcessFunction(queryConfig:
StreamQueryConfig)
override def onTimer(
timestamp: Long,
- ctx: ProcessFunction[(String, String), String]#OnTimerContext,
+ ctx: KeyedProcessFunction[K, (String, String), String]#OnTimerContext,
out: Collector[String]): Unit = {
if (stateCleaningEnabled) {