[streaming] Integration test added for windowed operations

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f32990ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f32990ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f32990ff

Branch: refs/heads/master
Commit: f32990ffa08fbe0b004c3a7443b295335a2aca3d
Parents: 412779f
Author: Gyula Fora <gyf...@apache.org>
Authored: Sun Feb 8 13:10:00 2015 +0100
Committer: mbalassi <mbala...@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 .../windowing/WindowIntegrationTest.java        | 200 +++++++++++++++++++
 1 file changed, 200 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f32990ff/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
new file mode 100755
index 0000000..c1687e6
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class WindowIntegrationTest implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+       private static final Integer MEMORYSIZE = 32;
+
+       @SuppressWarnings("serial")
+       private static class ModKey implements KeySelector<Integer, Integer> {
+               private int m;
+
+               public ModKey(int m) {
+                       this.m = m;
+               }
+
+               @Override
+               public Integer getKey(Integer value) throws Exception {
+                       return value % m;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static class IdentityWindowMap implements
+                       GroupReduceFunction<Integer, StreamWindow<Integer>> {
+
+               @Override
+               public void reduce(Iterable<Integer> values, 
Collector<StreamWindow<Integer>> out)
+                               throws Exception {
+
+                       StreamWindow<Integer> window = new 
StreamWindow<Integer>();
+
+                       for (Integer value : values) {
+                               window.add(value);
+                       }
+                       out.collect(window);
+               }
+
+       }
+
+       @Test
+       public void test() throws Exception {
+
+               List<Integer> inputs = new ArrayList<Integer>();
+               inputs.add(1);
+               inputs.add(2);
+               inputs.add(2);
+               inputs.add(3);
+               inputs.add(4);
+               inputs.add(5);
+               inputs.add(10);
+               inputs.add(11);
+               inputs.add(11);
+
+               StreamExecutionEnvironment env = new TestStreamEnvironment(2, 
MEMORYSIZE);
+
+               DataStream<Integer> source = env.fromCollection(inputs);
+
+               
source.window(Count.of(2)).every(Count.of(3)).sum(0).getDiscretizedStream()
+                               .addSink(new CentralSink1());
+
+               source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new 
IdentityWindowMap())
+                               .flatten().addSink(new CentralSink2());
+
+               source.groupBy(new 
ModKey(3)).window(Count.of(2)).sum(0).getDiscretizedStream()
+                               .addSink(new DistributedSink1());
+
+               source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new 
ModKey(2))
+                               .mapWindow(new 
IdentityWindowMap()).flatten().addSink(new DistributedSink2());
+
+               env.execute();
+
+               // sum ( Count of 2 slide 3 )
+               List<StreamWindow<Integer>> expected1 = new 
ArrayList<StreamWindow<Integer>>();
+               expected1.add(StreamWindow.fromElements(4));
+               expected1.add(StreamWindow.fromElements(9));
+               expected1.add(StreamWindow.fromElements(22));
+
+               validateOutput(expected1, CentralSink1.windows);
+
+               // Tumbling Count of 4 grouped by mod 2
+               List<StreamWindow<Integer>> expected2 = new 
ArrayList<StreamWindow<Integer>>();
+               expected2.add(StreamWindow.fromElements(2, 2));
+               expected2.add(StreamWindow.fromElements(1, 3));
+               expected2.add(StreamWindow.fromElements(4, 10));
+               expected2.add(StreamWindow.fromElements(5, 11));
+               expected2.add(StreamWindow.fromElements(11));
+
+               validateOutput(expected2, CentralSink2.windows);
+
+               // groupby mod 3 sum ( Tumbling Count of 2)
+               List<StreamWindow<Integer>> expected3 = new 
ArrayList<StreamWindow<Integer>>();
+               expected3.add(StreamWindow.fromElements(4));
+               expected3.add(StreamWindow.fromElements(5));
+               expected3.add(StreamWindow.fromElements(16));
+               expected3.add(StreamWindow.fromElements(10));
+               expected3.add(StreamWindow.fromElements(11));
+               expected3.add(StreamWindow.fromElements(3));
+
+               validateOutput(expected3, DistributedSink1.windows);
+
+               // groupby mod3 Tumbling Count of 2 grouped by mod 2
+               List<StreamWindow<Integer>> expected4 = new 
ArrayList<StreamWindow<Integer>>();
+               expected4.add(StreamWindow.fromElements(2, 2));
+               expected4.add(StreamWindow.fromElements(1));
+               expected4.add(StreamWindow.fromElements(4));
+               expected4.add(StreamWindow.fromElements(5, 11));
+               expected4.add(StreamWindow.fromElements(10));
+               expected4.add(StreamWindow.fromElements(11));
+               expected4.add(StreamWindow.fromElements(3));
+
+               validateOutput(expected4, DistributedSink2.windows);
+
+       }
+
+       private <R> void validateOutput(List<R> expected, List<R> actual) {
+               assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
+       }
+
+       @SuppressWarnings("serial")
+       private static class CentralSink1 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = new 
ArrayList<StreamWindow<Integer>>();
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class CentralSink2 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = new 
ArrayList<StreamWindow<Integer>>();
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class DistributedSink1 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = new 
ArrayList<StreamWindow<Integer>>();
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+
+       @SuppressWarnings("serial")
+       private static class DistributedSink2 implements 
SinkFunction<StreamWindow<Integer>> {
+
+               public static List<StreamWindow<Integer>> windows = new 
ArrayList<StreamWindow<Integer>>();
+
+               @Override
+               public void invoke(StreamWindow<Integer> value) throws 
Exception {
+                       windows.add(value);
+               }
+
+       }
+}

Reply via email to