Repository: flink
Updated Branches:
  refs/heads/release-1.1 a7f6594b6 -> 52a4440d9


[FLINK-4640] [streaming api] Ensure that the state descriptors properly 
initialize the serializers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/52a4440d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/52a4440d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/52a4440d

Branch: refs/heads/release-1.1
Commit: 52a4440d916fb450c4999f6e1f42f392e247b426
Parents: a7f6594
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 20 20:49:40 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 15:12:02 2016 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |   4 +-
 .../api/datastream/WindowedStream.java          |   4 +-
 .../operators/StateDescriptorPassingTest.java   | 219 +++++++++++++++++++
 3 files changed, 221 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index fa3b90d..41b131a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -469,9 +469,7 @@ public class AllWindowedStream<T, W extends Window> {
 
                } else {
                        FoldingStateDescriptor<T, R> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
-                                       initialValue,
-                                       foldFunction,
-                                       resultType);
+                                       initialValue, foldFunction, 
resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index e81d7af..ae98619 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -489,9 +489,7 @@ public class WindowedStream<T, K, W extends Window> {
 
                } else {
                        FoldingStateDescriptor<T, R> stateDesc = new 
FoldingStateDescriptor<>("window-contents",
-                               initialValue,
-                               foldFunction,
-                               resultType);
+                               initialValue, foldFunction, 
resultType.createSerializer(getExecutionEnvironment().getConfig()));
 
                        opName = "TriggerWindow(" + windowAssigner + ", " + 
stateDesc + ", " + trigger + ", " + udfName + ")";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/52a4440d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
new file mode 100644
index 0000000..b1e3fbb
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+
+/**
+ * Various tests around the proper passing of state descriptors to the 
operators
+ * and their serialization.
+ * 
+ * The tests use an arbitrary generic type to validate the behavior.
+ */
+@SuppressWarnings("serial")
+public class StateDescriptorPassingTest {
+
+       @SuppressWarnings("unchecked")
+       private final Class<? extends Serializer<?>> javaSerializer =
+                       (Class<? extends Serializer<?>>) (Class<?>) 
JavaSerializer.class;
+       
+       @Test
+       public void testFoldWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+               DataStream<String> src = env.fromElements("abc");
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<String, String>() {
+                                       @Override
+                                       public String getKey(String value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .fold(new File("/"), new FoldFunction<String, 
File>() {
+
+                                       @Override
+                                       public File fold(File a, String e) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testReduceWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<File, String>() {
+                                       @Override
+                                       public String getKey(File value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .reduce(new ReduceFunction<File>() {
+                                       
+                                       @Override
+                                       public File reduce(File value1, File 
value2) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testApplyWindowState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+               
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .keyBy(new KeySelector<File, String>() {
+                                       @Override
+                                       public String getKey(File value) {
+                                               return null;
+                                       }
+                               })
+                               .timeWindow(Time.milliseconds(1000))
+                               .apply(new WindowFunction<File, String, String, 
TimeWindow>() {
+                                       @Override
+                                       public void apply(String s, TimeWindow 
window, 
+                                                                               
Iterable<File> input, Collector<String> out) {}
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testFoldWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+               DataStream<String> src = env.fromElements("abc");
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .fold(new File("/"), new FoldFunction<String, 
File>() {
+
+                                       @Override
+                                       public File fold(File a, String e) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testReduceWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .reduce(new ReduceFunction<File>() {
+
+                                       @Override
+                                       public File reduce(File value1, File 
value2) {
+                                               return null;
+                                       }
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       @Test
+       public void testApplyWindowAllState() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               env.registerTypeWithKryoSerializer(File.class, javaSerializer);
+
+               DataStream<File> src = env.fromElements(new File("/"));
+
+               SingleOutputStreamOperator<?> result = src
+                               .timeWindowAll(Time.milliseconds(1000))
+                               .apply(new AllWindowFunction<File, String, 
TimeWindow>() {
+                                       @Override
+                                       public void apply(TimeWindow window, 
Iterable<File> input, Collector<String> out) {}
+                               });
+
+               validateStateDescriptorConfigured(result);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  generic validation
+       // 
------------------------------------------------------------------------
+
+       private void 
validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) {
+               OneInputTransformation<?, ?> transform = 
(OneInputTransformation<?, ?>) result.getTransformation();
+               WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, 
?>) transform.getOperator();
+               StateDescriptor<?, ?> descr = op.getStateDescriptor();
+
+               // this would be the first statement to fail if state 
descriptors were not properly initialized
+               TypeSerializer<?> serializer = descr.getSerializer();
+               assertTrue(serializer instanceof KryoSerializer);
+
+               Kryo kryo = ((KryoSerializer<?>) serializer).getKryo();
+
+               assertTrue("serializer registration was not properly passed 
on", 
+                               kryo.getSerializer(File.class) instanceof 
JavaSerializer);
+       }
+}

Reply via email to