Repository: flink Updated Branches: refs/heads/master 17dd915e8 -> e14135518
[FLINK-6138] [table] Create the ListStateDescriptor with the aggregationStateType instead of a serializer. this closes #3581 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1413551 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1413551 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1413551 Branch: refs/heads/master Commit: e14135518d51e6b491f2cd512234b71f1cf1d716 Parents: 17dd915 Author: é竹 <[email protected]> Authored: Tue Mar 21 12:50:02 2017 +0800 Committer: Jark Wu <[email protected]> Committed: Tue Mar 21 20:58:16 2017 +0800 ---------------------------------------------------------------------- .../UnboundedNonPartitionedProcessingOverProcessFunction.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e1413551/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala index 51c8315..7750511 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala @@ -98,9 +98,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction( } override def initializeState(context: FunctionInitializationContext): Unit = { - val stateSerializer = - aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig) - val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer) + val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", aggregationStateType) state = context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor) } }
