Repository: flink
Updated Branches:
  refs/heads/master 17dd915e8 -> e14135518


[FLINK-6138] [table] Create the ListStateDescriptor with the 
aggregationStateType instead of a serializer.

this closes #3581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1413551
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1413551
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1413551

Branch: refs/heads/master
Commit: e14135518d51e6b491f2cd512234b71f1cf1d716
Parents: 17dd915
Author: 金竹 <[email protected]>
Authored: Tue Mar 21 12:50:02 2017 +0800
Committer: Jark Wu <[email protected]>
Committed: Tue Mar 21 20:58:16 2017 +0800

----------------------------------------------------------------------
 .../UnboundedNonPartitionedProcessingOverProcessFunction.scala   | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1413551/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
index 51c8315..7750511 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala
@@ -98,9 +98,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction(
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
-    val stateSerializer =
-      
aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
-    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", 
stateSerializer)
+    val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", 
aggregationStateType)
     state = 
context.getOperatorStateStore.getOperatorState(accumulatorsDescriptor)
   }
 }

Reply via email to