[FLINK-4640] [streaming api] Ensure that the state descriptors properly 
initialize the serializers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d4eb64b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d4eb64b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d4eb64b

Branch: refs/heads/master
Commit: 4d4eb64be7490672771243147824a70d3d47c501
Parents: 82ef021
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 20:49:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |   4 +-
 .../api/datastream/WindowedStream.java          |   4 +-
 .../operators/windowing/WindowOperator.java     |  43 ++--
 .../operators/StateDescriptorPassingTest.java   | 214 +++++++++++++++++++
 .../operators/windowing/WindowOperatorTest.java |   2 +
 5 files changed, 241 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4b083c8..6b09f3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -469,9 +469,7 @@ public class AllWindowedStream<T, W extends Window> {
 
                } else {
                        FoldingStateDescriptor<T, R> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
-                                       initialValue,
-                                       foldFunction,
-                                       resultType);
+                                       initialValue, foldFunction, 
resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e81d7af..ae98619 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -489,9 +489,7 @@ public class WindowedStream<T, K, W extends Window> {
 
                } else {
                        FoldingStateDescriptor<T, R> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
-                               initialValue,
-                               foldFunction,
-                               resultType);
+                               initialValue, foldFunction, 
resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 25ec519..dffa2a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -61,7 +61,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -74,7 +73,8 @@ import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An operator that implements the logic for windowing based on a {@link 
WindowAssigner} and
@@ -186,26 +186,29 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        /**
         * Creates a new {@code WindowOperator} based on the given policies and 
user functions.
         */
-       public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
-               TypeSerializer<W> windowSerializer,
-               KeySelector<IN, K> keySelector,
-               TypeSerializer<K> keySerializer,
-               StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor,
-               InternalWindowFunction<ACC, OUT, K, W> windowFunction,
-               Trigger<? super IN, ? super W> trigger,
-               long allowedLateness) {
+       public WindowOperator(
+                       WindowAssigner<? super IN, W> windowAssigner,
+                       TypeSerializer<W> windowSerializer,
+                       KeySelector<IN, K> keySelector,
+                       TypeSerializer<K> keySerializer,
+                       StateDescriptor<? extends AppendingState<IN, ACC>, ?> 
windowStateDescriptor,
+                       InternalWindowFunction<ACC, OUT, K, W> windowFunction,
+                       Trigger<? super IN, ? super W> trigger,
+                       long allowedLateness) {
 
                super(windowFunction);
 
-               this.windowAssigner = requireNonNull(windowAssigner);
-               this.windowSerializer = windowSerializer;
-               this.keySelector = requireNonNull(keySelector);
-               this.keySerializer = requireNonNull(keySerializer);
+               checkArgument(allowedLateness >= 0);
 
-               this.windowStateDescriptor = windowStateDescriptor;
-               this.trigger = requireNonNull(trigger);
+               checkArgument(windowStateDescriptor == null || 
windowStateDescriptor.isSerializerInitialized(),
+                               "window state serializer is not properly 
initialized");
 
-               Preconditions.checkArgument(allowedLateness >= 0);
+               this.windowAssigner = checkNotNull(windowAssigner);
+               this.windowSerializer = checkNotNull(windowSerializer);
+               this.keySelector = checkNotNull(keySelector);
+               this.keySerializer = checkNotNull(keySerializer);
+               this.windowStateDescriptor = windowStateDescriptor;
+               this.trigger = checkNotNull(trigger);
                this.allowedLateness = allowedLateness;
 
                setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -666,7 +669,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                public <S extends Serializable> ValueState<S> 
getKeyValueState(String name,
                        Class<S> stateType,
                        S defaultState) {
-                       requireNonNull(stateType, "The state type class must 
not be null");
+                       checkNotNull(stateType, "The state type class must not 
be null");
 
                        TypeInformation<S> typeInfo;
                        try {
@@ -686,8 +689,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        TypeInformation<S> stateType,
                        S defaultState) {
 
-                       requireNonNull(name, "The name of the state must not be 
null");
-                       requireNonNull(stateType, "The state type information 
must not be null");
+                       checkNotNull(name, "The name of the state must not be 
null");
+                       checkNotNull(stateType, "The state type information 
must not be null");
 
                        ValueStateDescriptor<S> stateDesc = new 
ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), 
defaultState);
                        return getPartitionedState(stateDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
new file mode 100644
index 0000000..c0ca6a0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+
+/**
+ * Various tests around the proper passing of state descriptors to the 
operators
+ * and their serialization.
+ * 
+ * The tests use an arbitrary generic type to validate the behavior.
+ */
+@SuppressWarnings("serial")
+public class StateDescriptorPassingTest {
+
+       @Test
+       public void testFoldWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<String> src = env.fromElements("abc");
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<String, String>() {
+                                       @Override
+                                       public String getKey(String value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .fold(new File("/"), new FoldFunction<String, 
File>() {
+
+                                       @Override
+                                       public File fold(File a, String e) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testReduceWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<File, String>() {
+                                       @Override
+                                       public String getKey(File value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .reduce(new ReduceFunction<File>() {
+                                       
+                                       @Override
+                                       public File reduce(File value1, File 
value2) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testApplyWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+               
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<File, String>() {
+                                       @Override
+                                       public String getKey(File value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .apply(new WindowFunction<File, String, String, 
TimeWindow>() {
+                                       @Override
+                                       public void apply(String s, TimeWindow 
window, 
+                                                                               
Iterable<File> input, Collector<String> out) {}
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testFoldWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<String> src = env.fromElements("abc");
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .fold(new File("/"), new FoldFunction<String, 
File>() {
+
+                                       @Override
+                                       public File fold(File a, String e) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testReduceWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .reduce(new ReduceFunction<File>() {
+
+                                       @Override
+                                       public File reduce(File value1, File 
value2) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testApplyWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, 
JavaSerializer.class);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .apply(new AllWindowFunction<File, String, 
TimeWindow>() {
+                                       @Override
+                                       public void apply(TimeWindow window, 
Iterable<File> input, Collector<String> out) {}
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  generic validation
+       // 
------------------------------------------------------------------------
+
+       private void 
validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+               OneInputTransformation<?, ?> transform = 
(OneInputTransformation<?, ?>) result.getTransformation();
+               WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, 
?>) transform.getOperator();
+               StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+               // this would be the first statement to fail if state 
descriptors were not properly initialized
+               TypeSerializer<?> serializer = descr.getSerializer();
+               assertTrue(serializer instanceof KryoSerializer);
+
+               Kryo kryo = ((KryoSerializer<?>) serializer).getKryo();
+
+               assertTrue("serializer registration was not properly passed 
on", 
+                               kryo.getSerializer(File.class) instanceof 
JavaSerializer);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d4eb64b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 67a6f55..fd73bcc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -2102,6 +2102,7 @@ public class WindowOperatorTest extends TestLogger {
                                        }
                                },
                                inputType);
+               windowStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(
@@ -2246,6 +2247,7 @@ public class WindowOperatorTest extends TestLogger {
                                        }
                                },
                                inputType);
+               windowStateDesc.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator =
                        new WindowOperator<>(

Reply via email to