[streaming] Add test that runs streaming with fault tolerance

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/244e5d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/244e5d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/244e5d5f

Branch: refs/heads/master
Commit: 244e5d5f806e3d0e55f28a60301e5ac28ad0bd18
Parents: f372358
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 20 14:14:39 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Mar 21 22:13:02 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   3 +-
 .../StreamCheckpointingITCase.java              | 227 +++++++++++++++++++
 2 files changed, 229 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/244e5d5f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5f6f981..b62a6d8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ClosureCleaner;
@@ -864,7 +865,7 @@ public class DataStream<OUT> {
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<Long, ?> count() {
-               TypeInformation<Long> outTypeInfo = 
TypeExtractor.getForObject(Long.valueOf(0));
+               TypeInformation<Long> outTypeInfo = 
BasicTypeInfo.LONG_TYPE_INFO;
 
                return transform("Count", outTypeInfo, new 
CounterInvokable<OUT>());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/244e5d5f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
new file mode 100644
index 0000000..3accb11
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * A simple test that runs a streaming topology with checkpointing enabled.
+ */
+@SuppressWarnings("serial")
+public class StreamCheckpointingITCase {
+
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int NUM_TASK_SLOTS = 3;
+       private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
+
+       private static final long NUM_STRINGS = 4000000;
+
+       private static ForkableFlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void startCluster() {
+               try {
+                       Configuration conf = new Configuration();
+                       
conf.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
+                       
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
+
+                       cluster = new ForkableFlinkMiniCluster(conf, false);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("custer startup failed");
+               }
+       }
+
+       @AfterClass
+       public static void shutdownCluster() {
+               try {
+                       cluster.stop();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Cluster shutdown failed.");
+               }
+       }
+
+       /**
+        * Runs the following program:
+        *
+        * <pre>
+        *
+        *     (source)  ->  (filter)  ->  (map)  ->  (groupBy / reduce)  -> 
(sink)
+        *
+        * </pre>
+        */
+       @Test
+       public void runCheckpointedProgram() {
+
+               assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
+
+               try {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(
+                                                                               
                                                        "localhost", 
cluster.getJobManagerRPCPort());
+                       env.setDegreeOfParallelism(PARALLELISM);
+//                     env.enableMonitoring(500);
+
+                       DataStream<String> stream = env.addSource(new 
RichParallelSourceFunction<String>() {
+
+                               private Random rnd;
+                               private StringBuilder stringBuilder;
+
+                               private int step;
+
+                               private boolean running = true;
+
+                               @Override
+                               public void open(Configuration parameters) {
+                                       rnd = new Random();
+                                       stringBuilder = new StringBuilder();
+                                       step = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                               }
+
+                               @Override
+                               public void run(Collector<String> collector) 
throws Exception {
+                                       for (long i = 
getRuntimeContext().getIndexOfThisSubtask(); running && i < NUM_STRINGS; i += 
step) {
+                                               char first = (char) ((i % 40) + 
40);
+
+                                               stringBuilder.setLength(0);
+                                               stringBuilder.append(first);
+
+                                               
collector.collect(randomString(stringBuilder, rnd));
+                                       }
+                               }
+
+                               @Override
+                               public void cancel() {
+                                       running = false;
+                               }
+                       });
+
+                       stream
+                                       .filter(new FilterFunction<String>() {
+                                               @Override
+                                               public boolean filter(String 
value) {
+                                                       return value.length() < 
100;
+                                               }
+                                       })
+                                       .map(new MapFunction<String, 
PrefixCount>() {
+
+                                               @Override
+                                               public PrefixCount map(String 
value) {
+                                                       return new 
PrefixCount(value.substring(0, 1), value, 1L);
+                                               }
+                                       })
+                                       .groupBy("prefix")
+                                       .reduce(new 
ReduceFunction<PrefixCount>() {
+                                               @Override
+                                               public PrefixCount 
reduce(PrefixCount value1, PrefixCount value2) {
+                                                       value1.count += 
value2.count;
+                                                       return value1;
+                                               }
+                                       })
+                                       .addSink(new 
RichSinkFunction<PrefixCount>() {
+
+                                               private Map<Character, Long> 
counts = new HashMap<Character, Long>();
+
+                                               @Override
+                                               public void invoke(PrefixCount 
value) {
+                                                       Character first = 
value.prefix.charAt(0);
+                                                       Long previous = 
counts.get(first);
+                                                       if (previous == null) {
+                                                               
counts.put(first, value.count);
+                                                       } else {
+                                                               
counts.put(first, Math.max(previous, value.count));
+                                                       }
+                                               }
+
+                                               @Override
+                                               public void close() {
+                                                       for (Long count : 
counts.values()) {
+                                                               
assertEquals(NUM_STRINGS / 40, count.longValue());
+                                                       }
+                                               }
+
+                                               @Override
+                                               public void cancel() {}
+                                       });
+
+                       env.execute();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private static String randomString(StringBuilder bld, Random rnd) {
+               final int len = rnd.nextInt(10) + 5;
+
+               for (int i = 0; i < len; i++) {
+                       char next = (char) (rnd.nextInt(20000) + 33);
+                       bld.append(next);
+               }
+
+               return bld.toString();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom Type Classes
+       // 
--------------------------------------------------------------------------------------------
+
+       public static class PrefixCount {
+
+               public String prefix;
+               public String value;
+               public long count;
+
+               public PrefixCount() {}
+
+               public PrefixCount(String prefix, String value, long count) {
+                       this.prefix = prefix;
+                       this.value = value;
+                       this.count = count;
+               }
+
+               @Override
+               public String toString() {
+                       return prefix + " / " + value;
+               }
+       }
+}

Reply via email to