http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java new file mode 100644 index 0000000..b06d193 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.twitter.util; + +//example data looking like tweets, but not acquired from Twitter +public class TwitterStreamData { + public static final String[] TEXTS = new String[] { + "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000000,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example1\",\"indices\":[0,0]},{\"text\":\"tweet1\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_ tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}", + "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000001,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example2\",\"indices\":[0,0]},{\"text\":\"tweet2\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_ tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}", + "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"id\":000000000000000002,\"id_str\":\"000000000000000000\",\"text\":\"Apache Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache Flink\",\"screen_name\":\"Apache Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon Jan 1 00:00:00 +0000 1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example3\",\"indices\":[0,0]},{\"text\":\"tweet3\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_ tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}", + }; + + public static final String STREAMING_COUNTS_AS_TUPLES = "(apache,1)\n" + "(apache,2)\n" + "(apache,3)\n" + "(flink,1)\n" + "(flink,2)\n" + "(flink,3)\n"; + + private TwitterStreamData() { + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java new file mode 100644 index 0000000..f08069b --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@SuppressWarnings("serial") +public class GroupedProcessingTimeWindowExample { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataStream<Tuple2<Long, Long>> stream = env + .addSource(new RichParallelSourceFunction<Tuple2<Long, Long>>() { + + private volatile boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + final long startTime = System.currentTimeMillis(); + + final long numElements = 20000000; + final long numKeys = 10000; + long val = 1L; + long count = 0L; + + + while (running && count < numElements) { + count++; + ctx.collect(new Tuple2<>(val++, 1L)); + + if (val > numKeys) { + val = 1L; + } + } + + final long endTime = System.currentTimeMillis(); + System.out.println("Took " + (endTime-startTime) + " msecs for " + numElements + " values"); + } + + @Override + public void cancel() { + running = false; + } + }); + + stream + .keyBy(0) + .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) + .reduce(new SummingReducer()) + + // alternative: use a apply function which does not pre-aggregate +// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>()) +// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS)) +// .apply(new SummingWindowFunction()) + + .addSink(new SinkFunction<Tuple2<Long, Long>>() { + @Override + public void invoke(Tuple2<Long, Long> value) { + } + }); + + env.execute(); + } + + public static class FirstFieldKeyExtractor<Type extends Tuple, Key> implements KeySelector<Type, Key> { + + @Override + @SuppressWarnings("unchecked") + public Key getKey(Type value) { + return (Key) value.getField(0); + } + } + + public static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> { + + @Override + public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) { + long sum = 0L; + for (Tuple2<Long, Long> value : values) { + sum += value.f1; + } + + out.collect(new Tuple2<>(key, sum)); + } + } + + public static class SummingReducer implements ReduceFunction<Tuple2<Long, Long>> { + + @Override + public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) { + return new Tuple2<>(value1.f0, value1.f1 + value2.f1); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java new file mode 100644 index 0000000..035727a --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; + +import java.util.ArrayList; +import java.util.List; + +public class SessionWindowing { + + @SuppressWarnings("serial") + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(2); + + final List<Tuple3<String, Long, Integer>> input = new ArrayList<>(); + + input.add(new Tuple3<>("a", 1L, 1)); + input.add(new Tuple3<>("b", 1L, 1)); + input.add(new Tuple3<>("b", 3L, 1)); + input.add(new Tuple3<>("b", 5L, 1)); + input.add(new Tuple3<>("c", 6L, 1)); + // We expect to detect the session "a" earlier than this point (the old + // functionality can only detect here when the next starts) + input.add(new Tuple3<>("a", 10L, 1)); + // We expect to detect session "b" and "c" at this point as well + input.add(new Tuple3<>("c", 11L, 1)); + + DataStream<Tuple3<String, Long, Integer>> source = env + .addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception { + for (Tuple3<String, Long, Integer> value : input) { + ctx.collectWithTimestamp(value, value.f1); + ctx.emitWatermark(new Watermark(value.f1 - 1)); + if (!fileOutput) { + System.out.println("Collected: " + value); + } + } + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + + @Override + public void cancel() { + } + }); + + // We create sessions for each id with max timeout of 3 time units + DataStream<Tuple3<String, Long, Integer>> aggregated = source + .keyBy(0) + .window(GlobalWindows.create()) + .trigger(new SessionTrigger(3L)) + .sum(2); + + if (fileOutput) { + aggregated.writeAsText(outputPath); + } else { + aggregated.print(); + } + + env.execute(); + } + + private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> { + + private static final long serialVersionUID = 1L; + + private final Long sessionTimeout; + + public SessionTrigger(Long sessionTimeout) { + this.sessionTimeout = sessionTimeout; + + } + + @Override + public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception { + + OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + Long lastSeen = lastSeenState.value(); + + Long timeSinceLastEvent = timestamp - lastSeen; + + // Update the last seen event time + lastSeenState.update(timestamp); + + ctx.registerEventTimeTimer(lastSeen + sessionTimeout); + + if (timeSinceLastEvent > sessionTimeout) { + return TriggerResult.FIRE_AND_PURGE; + } else { + return TriggerResult.CONTINUE; + } + } + + @Override + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { + OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); + Long lastSeen = lastSeenState.value(); + + if (time - lastSeen >= sessionTimeout) { + return TriggerResult.FIRE_AND_PURGE; + } + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + if (args.length == 1) { + fileOutput = true; + outputPath = args[0]; + } else { + System.err.println("Usage: SessionWindowing <result path>"); + return false; + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java new file mode 100644 index 0000000..30eda67 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger; + +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * An example of grouped stream windowing where different eviction and trigger + * policies can be used. A source fetches events from cars every 1 sec + * containing their id, their current speed (kmh), overall elapsed distance (m) + * and a timestamp. The streaming example triggers the top speed of each car + * every x meters elapsed for the last y seconds. + */ +public class TopSpeedWindowing { + + private static final int NUM_CAR_EVENTS = 100; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + @SuppressWarnings({"rawtypes", "serial"}) + DataStream<Tuple4<Integer, Integer, Double, Long>> carData; + + if (fileInput) { + carData = env.readTextFile(inputPath).map(new ParseCarData()); + } else { + int numOfCars = 2; + carData = env.addSource(CarSource.create(numOfCars)); + } + + int evictionSec = 10; + double triggerMeters = 50; + DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData + .assignTimestamps(new CarTimestamp()) + .keyBy(0) + .window(GlobalWindows.create()) + .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS))) + .trigger(DeltaTrigger.of(triggerMeters, + new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() { + private static final long serialVersionUID = 1L; + + @Override + public double getDelta( + Tuple4<Integer, Integer, Double, Long> oldDataPoint, + Tuple4<Integer, Integer, Double, Long> newDataPoint) { + return newDataPoint.f2 - oldDataPoint.f2; + } + })) + .maxBy(1); + + if (fileOutput) { + topSpeeds.print(); + topSpeeds.writeAsText(outputPath); + } else { + topSpeeds.print(); + } + + env.execute("CarTopSpeedWindowingExample"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> { + + private static final long serialVersionUID = 1L; + private Integer[] speeds; + private Double[] distances; + + private Random rand = new Random(); + + private volatile boolean isRunning = true; + private int counter; + + private CarSource(int numOfCars) { + speeds = new Integer[numOfCars]; + distances = new Double[numOfCars]; + Arrays.fill(speeds, 50); + Arrays.fill(distances, 0d); + } + + public static CarSource create(int cars) { + return new CarSource(cars); + } + + @Override + public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception { + + while (isRunning && counter < NUM_CAR_EVENTS) { + Thread.sleep(100); + for (int carId = 0; carId < speeds.length; carId++) { + if (rand.nextBoolean()) { + speeds[carId] = Math.min(100, speeds[carId] + 5); + } else { + speeds[carId] = Math.max(0, speeds[carId] - 5); + } + distances[carId] += speeds[carId] / 3.6d; + Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(carId, + speeds[carId], distances[carId], System.currentTimeMillis()); + ctx.collect(record); + counter++; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class ParseCarData extends + RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public Tuple4<Integer, Integer, Double, Long> map(String record) { + String rawData = record.substring(1, record.length() - 1); + String[] data = rawData.split(","); + return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3])); + } + } + + private static class CarTimestamp implements TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple4<Integer, Integer, Double, Long> element, + long currentTimestamp) { + return element.f3; + } + + @Override + public long extractWatermark(Tuple4<Integer, Integer, Double, Long> element, + long currentTimestamp) { + return element.f3 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileInput = false; + private static boolean fileOutput = false; + private static String inputPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + if (args.length == 2) { + fileInput = true; + fileOutput = true; + inputPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: TopSpeedWindowingExample <input path> <output path>"); + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java new file mode 100644 index 0000000..f3d57bf --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.examples.wordcount.WordCount; + +/** + * Implements a windowed version of the streaming "WordCount" program. + * + * <p> + * The input is a plain text file with lines separated by newline characters. + * + * <p> + * Usage: <code>WordCount <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}. + * + * <p> + * This example shows how to: + * <ul> + * <li>write a simple Flink Streaming program, + * <li>use tuple data types, + * <li>use basic windowing abstractions. + * </ul> + * + */ +public class WindowWordCount { + + // window parameters with default values + private static int windowSize = 250; + private static int slideSize = 150; + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataStream<String> text = getTextDataStream(env); + + DataStream<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new WordCount.Tokenizer()) + // create windows of windowSize records slided every slideSize records + .keyBy(0) + .countWindow(windowSize, slideSize) + // group by the tuple field "0" and sum up tuple field "1" + .sum(1); + + // emit result + if (fileOutput) { + counts.writeAsText(outputPath); + } else { + counts.print(); + } + + // execute program + env.execute("WindowWordCount"); + } + + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length >= 2 && args.length <= 4) { + textPath = args[0]; + outputPath = args[1]; + if (args.length >= 3){ + windowSize = Integer.parseInt(args[2]); + + // if no slide size is specified use the + slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]); + } + } else { + System.err.println("Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]"); + return false; + } + } else { + System.out.println("Executing WindowWordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WindowWordCount <text path> <result path> [<window size>] [<slide size>]"); + } + return true; + } + + private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return env.fromElements(WordCountData.WORDS); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java new file mode 100644 index 0000000..c1a99a8 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing.util; + +public class SessionWindowingData { + + public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + "(c,11,1)\n" + "(b,1,3)\n" + + "(a,10,1)"; + + private SessionWindowingData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java new file mode 100644 index 0000000..bf63695 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing.util; + +public class TopSpeedWindowingExampleData { + + public static final String CAR_DATA = + "(0,55,15.277777777777777,1424951918630)\n" + "(1,45,12.5,1424951918632)\n" + + "(0,50,29.166666666666664,1424951919632)\n" + "(1,50,26.38888888888889,1424951919632)\n" + + "(0,55,44.44444444444444,1424951920633)\n" + "(1,45,38.888888888888886,1424951920633)\n" + + "(0,50,58.33333333333333,1424951921634)\n" + "(1,40,50.0,1424951921634)\n" + + "(0,55,73.6111111111111,1424951922634)\n" + "(1,35,59.72222222222222,1424951922634)\n" + + "(0,60,90.27777777777777,1424951923634)\n" + "(1,40,70.83333333333333,1424951923634)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + "(1,35,80.55555555555554,1424951924635)\n" + + "(0,60,125.0,1424951925635)\n" + "(1,40,91.66666666666666,1424951925635)\n" + + "(0,55,140.27777777777777,1424951926635)\n" + "(1,45,104.16666666666666,1424951926636)\n" + + "(0,60,156.94444444444443,1424951927636)\n" + "(1,50,118.05555555555554,1424951927636)\n" + + "(0,55,172.2222222222222,1424951928636)\n" + "(1,45,130.55555555555554,1424951928636)\n" + + "(0,50,186.1111111111111,1424951929636)\n" + "(1,50,144.44444444444443,1424951929637)\n" + + "(0,55,201.38888888888886,1424951930637)\n" + "(1,55,159.7222222222222,1424951930637)\n" + + "(0,60,218.05555555555551,1424951931637)\n" + "(1,60,176.38888888888886,1424951931637)\n" + + "(0,55,233.3333333333333,1424951932637)\n" + "(1,65,194.4444444444444,1424951932638)\n" + + "(0,50,247.22222222222217,1424951933638)\n" + "(1,70,213.88888888888886,1424951933638)\n" + + "(0,45,259.7222222222222,1424951934638)\n" + "(1,65,231.9444444444444,1424951934638)\n" + + "(0,50,273.6111111111111,1424951935638)\n" + "(1,70,251.38888888888886,1424951935639)\n" + + "(0,55,288.88888888888886,1424951936639)\n" + "(1,75,272.2222222222222,1424951936639)\n" + + "(0,50,302.77777777777777,1424951937639)\n" + "(1,70,291.66666666666663,1424951937639)\n" + + "(0,45,315.27777777777777,1424951938640)\n" + "(1,65,309.7222222222222,1424951938640)\n" + + "(0,50,329.1666666666667,1424951939640)\n" + "(1,70,329.16666666666663,1424951939640)\n" + + "(0,55,344.44444444444446,1424951940640)\n" + "(1,65,347.2222222222222,1424951940640)\n" + + "(0,50,358.33333333333337,1424951941641)\n" + "(1,70,366.66666666666663,1424951941641)\n" + + "(0,55,373.61111111111114,1424951942641)\n" + "(1,65,384.7222222222222,1424951942641)\n" + + "(0,50,387.50000000000006,1424951943641)\n" + "(1,70,404.16666666666663,1424951943641)\n" + + "(0,45,400.00000000000006,1424951944642)\n" + "(1,65,422.2222222222222,1424951944642)\n" + + "(0,50,413.88888888888897,1424951945642)\n" + "(1,60,438.88888888888886,1424951945642)\n" + + "(0,45,426.38888888888897,1424951946642)\n" + "(1,65,456.9444444444444,1424951946642)\n" + + "(0,40,437.50000000000006,1424951947643)\n" + "(1,70,476.38888888888886,1424951947643)\n" + + "(0,45,450.00000000000006,1424951948643)\n" + "(1,75,497.2222222222222,1424951948643)\n" + + "(0,40,461.11111111111114,1424951949643)\n" + "(1,80,519.4444444444443,1424951949644)\n" + + "(0,45,473.61111111111114,1424951950644)\n" + "(1,75,540.2777777777777,1424951950644)\n" + + "(0,50,487.50000000000006,1424951951644)\n" + "(1,80,562.4999999999999,1424951951644)\n" + + "(0,45,500.00000000000006,1424951952644)\n" + "(1,85,586.111111111111,1424951952645)\n" + + "(0,40,511.11111111111114,1424951953645)\n" + "(1,80,608.3333333333331,1424951953645)\n" + + "(0,35,520.8333333333334,1424951954645)\n" + "(1,75,629.1666666666665,1424951954645)\n" + + "(0,40,531.9444444444445,1424951955645)\n" + "(1,70,648.611111111111,1424951955646)\n" + + "(0,45,544.4444444444445,1424951956646)\n" + "(1,75,669.4444444444443,1424951956646)\n" + + "(0,50,558.3333333333334,1424951957646)\n" + "(1,80,691.6666666666665,1424951957646)\n" + + "(0,55,573.6111111111112,1424951958646)\n" + "(1,85,715.2777777777776,1424951958647)\n" + + "(0,60,590.2777777777778,1424951959647)\n" + "(1,80,737.4999999999998,1424951959647)\n" + + "(0,65,608.3333333333334,1424951960647)\n" + "(1,85,761.1111111111109,1424951960647)\n" + + "(0,70,627.7777777777778,1424951961647)\n" + "(1,80,783.333333333333,1424951961648)\n" + + "(0,75,648.6111111111112,1424951962648)\n" + "(1,85,806.9444444444441,1424951962648)\n" + + "(0,80,670.8333333333334,1424951963648)\n" + "(1,90,831.9444444444441,1424951963648)\n" + + "(0,75,691.6666666666667,1424951964649)\n" + "(1,95,858.333333333333,1424951964649)\n" + + "(0,70,711.1111111111112,1424951965649)\n" + "(1,90,883.333333333333,1424951965649)\n" + + "(0,75,731.9444444444446,1424951966649)\n" + "(1,95,909.722222222222,1424951966649)\n" + + "(0,70,751.388888888889,1424951967649)\n" + "(1,100,937.4999999999998,1424951967650)\n" + + "(0,75,772.2222222222224,1424951968650)\n" + "(1,100,965.2777777777776,1424951968650)\n" + + "(0,80,794.4444444444446,1424951969650)\n" + "(1,100,993.0555555555554,1424951969650)\n" + + "(0,75,815.2777777777779,1424951970651)\n" + "(1,100,1020.8333333333333,1424951970651)\n" + + "(0,80,837.5000000000001,1424951971651)\n" + "(1,100,1048.611111111111,1424951971651)\n" + + "(0,85,861.1111111111112,1424951972651)\n" + "(1,100,1076.388888888889,1424951972651)\n" + + "(0,80,883.3333333333334,1424951973652)\n" + "(1,95,1102.7777777777778,1424951973652)\n" + + "(0,75,904.1666666666667,1424951974652)\n" + "(1,100,1130.5555555555557,1424951974652)\n" + + "(0,70,923.6111111111112,1424951975652)\n" + "(1,100,1158.3333333333335,1424951975652)\n" + + "(0,75,944.4444444444446,1424951976653)\n" + "(1,100,1186.1111111111113,1424951976653)\n" + + "(0,80,966.6666666666667,1424951977653)\n" + "(1,95,1212.5000000000002,1424951977653)\n" + + "(0,75,987.5000000000001,1424951978653)\n" + "(1,100,1240.277777777778,1424951978653)\n" + + "(0,80,1009.7222222222223,1424951979654)\n" + "(1,100,1268.0555555555559,1424951979654)\n" + + "(0,85,1033.3333333333335,1424951980654)\n" + "(1,100,1295.8333333333337,1424951980654)\n" + + "(0,90,1058.3333333333335,1424951981654)\n" + "(1,100,1323.6111111111115,1424951981654)\n" + + "(0,85,1081.9444444444446,1424951982655)\n" + "(1,100,1351.3888888888894,1424951982655)\n" + + "(0,90,1106.9444444444446,1424951983655)\n" + "(1,100,1379.1666666666672,1424951983655)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + "(1,100,1406.944444444445,1424951984656)\n" + + "(0,90,1158.3333333333335,1424951985656)\n" + "(1,95,1433.333333333334,1424951985656)\n" + + "(0,95,1184.7222222222224,1424951986656)\n" + "(1,90,1458.333333333334,1424951986656)\n" + + "(0,90,1209.7222222222224,1424951987656)\n" + "(1,95,1484.7222222222229,1424951987657)\n" + + "(0,85,1233.3333333333335,1424951988657)\n" + "(1,90,1509.7222222222229,1424951988657)\n" + + "(0,80,1255.5555555555557,1424951989657)\n" + "(1,95,1536.1111111111118,1424951989657)\n" + + "(0,85,1279.1666666666667,1424951990657)\n" + "(1,100,1563.8888888888896,1424951990658)\n" + + "(0,90,1304.1666666666667,1424951991658)\n" + "(1,95,1590.2777777777785,1424951991658)\n" + + "(0,95,1330.5555555555557,1424951992658)\n" + "(1,90,1615.2777777777785,1424951992658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + "(1,95,1641.6666666666674,1424951993659)\n" + + "(0,100,1386.1111111111113,1424951994659)\n" + "(1,100,1669.4444444444453,1424951994659)\n" + + "(0,95,1412.5000000000002,1424951995659)\n" + "(1,95,1695.8333333333342,1424951995660)\n" + + "(0,100,1440.277777777778,1424951996660)\n" + "(1,90,1720.8333333333342,1424951996660)\n" + + "(0,100,1468.0555555555559,1424951997660)\n" + "(1,85,1744.4444444444453,1424951997660)\n" + + "(0,95,1494.4444444444448,1424951998660)\n" + "(1,80,1766.6666666666674,1424951998661)\n" + + "(0,100,1522.2222222222226,1424951999661)\n" + "(1,75,1787.5000000000007,1424951999661)\n" + + "(0,95,1548.6111111111115,1424952000661)\n" + "(1,80,1809.7222222222229,1424952000661)\n" + + "(0,90,1573.6111111111115,1424952001662)\n" + "(1,75,1830.555555555556,1424952001662)\n" + + "(0,95,1600.0000000000005,1424952002662)\n" + "(1,80,1852.7777777777783,1424952002662)\n" + + "(0,100,1627.7777777777783,1424952003662)\n" + "(1,85,1876.3888888888894,1424952003662)\n" + + "(0,100,1655.555555555556,1424952004663)\n" + "(1,80,1898.6111111111115,1424952004663)\n" + + "(0,95,1681.944444444445,1424952005663)\n" + "(1,85,1922.2222222222226,1424952005663)\n" + + "(0,100,1709.7222222222229,1424952006663)\n" + "(1,90,1947.2222222222226,1424952006664)\n" + + "(0,100,1737.5000000000007,1424952007664)\n" + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,95,1763.8888888888896,1424952008664)\n" + "(1,90,1998.6111111111115,1424952008664)\n" + + "(0,100,1791.6666666666674,1424952009664)\n" + "(1,85,2022.2222222222226,1424952009665)\n" + + "(0,95,1818.0555555555563,1424952010665)\n" + "(1,80,2044.4444444444448,1424952010665)\n" + + "(0,90,1843.0555555555563,1424952011665)\n" + "(1,75,2065.2777777777783,1424952011665)\n" + + "(0,95,1869.4444444444453,1424952012666)\n" + "(1,80,2087.5000000000005,1424952012666)\n" + + "(0,100,1897.222222222223,1424952013666)\n" + "(1,85,2111.1111111111118,1424952013666)\n" + + "(0,95,1923.611111111112,1424952014666)\n" + "(1,90,2136.1111111111118,1424952014666)\n" + + "(0,100,1951.3888888888898,1424952015667)\n" + "(1,85,2159.722222222223,1424952015667)\n" + + "(0,95,1977.7777777777787,1424952016667)\n" + "(1,90,2184.722222222223,1424952016667)\n" + + "(0,100,2005.5555555555566,1424952017667)\n" + "(1,95,2211.1111111111118,1424952017668)"; + + public static final String TOP_SPEEDS = + "(0,55,15.277777777777777,1424951918630)\n" + + "(1,50,26.38888888888889,1424951919632)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + + "(1,50,26.38888888888889,1424951919632)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + + "(1,65,194.4444444444444,1424951932638)\n" + + "(0,65,108.33333333333333,1424951924635)\n" + + "(1,70,213.88888888888886,1424951933638)\n" + + "(0,60,218.05555555555551,1424951931637)\n" + + "(1,75,272.2222222222222,1424951936639)\n" + + "(0,55,233.3333333333333,1424951932637)\n" + + "(1,75,272.2222222222222,1424951936639)\n" + + "(1,75,272.2222222222222,1424951936639)\n" + + "(0,55,288.88888888888886,1424951936639)\n" + + "(1,70,329.16666666666663,1424951939640)\n" + + "(0,55,373.61111111111114,1424951942641)\n" + + "(1,80,519.4444444444443,1424951949644)\n" + + "(1,85,586.111111111111,1424951952645)\n" + + "(0,50,487.50000000000006,1424951951644)\n" + + "(1,85,586.111111111111,1424951952645)\n" + + "(0,60,590.2777777777778,1424951959647)\n" + + "(1,85,586.111111111111,1424951952645)\n" + + "(0,75,648.6111111111112,1424951962648)\n" + + "(1,85,715.2777777777776,1424951958647)\n" + + "(1,95,858.333333333333,1424951964649)\n" + + "(0,80,670.8333333333334,1424951963648)\n" + + "(1,95,858.333333333333,1424951964649)\n" + + "(0,80,670.8333333333334,1424951963648)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + + "(0,80,670.8333333333334,1424951963648)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + + "(0,85,861.1111111111112,1424951972651)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + + "(1,100,937.4999999999998,1424951967650)\n" + + "(0,85,861.1111111111112,1424951972651)\n" + + "(1,100,993.0555555555554,1424951969650)\n" + + "(0,85,861.1111111111112,1424951972651)\n" + + "(1,100,1048.611111111111,1424951971651)\n" + + "(1,100,1130.5555555555557,1424951974652)\n" + + "(0,90,1058.3333333333335,1424951981654)\n" + + "(1,100,1158.3333333333335,1424951975652)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + + "(1,100,1240.277777777778,1424951978653)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + + "(1,100,1268.0555555555559,1424951979654)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + + "(1,100,1323.6111111111115,1424951981654)\n" + + "(0,95,1133.3333333333335,1424951984655)\n" + + "(1,100,1379.1666666666672,1424951983655)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + + "(1,100,1563.8888888888896,1424951990658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + + "(1,100,1563.8888888888896,1424951990658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + + "(1,100,1563.8888888888896,1424951990658)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + + "(0,100,1358.3333333333335,1424951993659)\n" + + "(1,100,1669.4444444444453,1424951994659)\n" + + "(0,100,1440.277777777778,1424951996660)\n" + + "(1,90,1720.8333333333342,1424951996660)\n" + + "(0,100,1468.0555555555559,1424951997660)\n" + + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,100,1522.2222222222226,1424951999661)\n" + + "(0,100,1627.7777777777783,1424952003662)\n" + + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,100,1627.7777777777783,1424952003662)\n" + + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,100,1709.7222222222229,1424952006663)\n" + + "(0,100,1737.5000000000007,1424952007664)\n" + + "(1,95,1973.6111111111115,1424952007664)\n" + + "(0,100,1791.6666666666674,1424952009664)\n" + + "(1,95,2211.1111111111118,1424952017668)\n"; + + public static final String TOP_CASE_CLASS_SPEEDS = + "CarEvent(0,55,15.277777777777777,1424951918630)\n" + + "CarEvent(1,50,26.38888888888889,1424951919632)\n" + + "CarEvent(0,65,108.33333333333333,1424951924635)\n" + + "CarEvent(1,50,26.38888888888889,1424951919632)\n" + + "CarEvent(0,65,108.33333333333333,1424951924635)\n" + + "CarEvent(1,65,194.4444444444444,1424951932638)\n" + + "CarEvent(0,65,108.33333333333333,1424951924635)\n" + + "CarEvent(1,70,213.88888888888886,1424951933638)\n" + + "CarEvent(0,60,218.05555555555551,1424951931637)\n" + + "CarEvent(1,75,272.2222222222222,1424951936639)\n" + + "CarEvent(0,55,233.3333333333333,1424951932637)\n" + + "CarEvent(1,75,272.2222222222222,1424951936639)\n" + + "CarEvent(1,75,272.2222222222222,1424951936639)\n" + + "CarEvent(0,55,288.88888888888886,1424951936639)\n" + + "CarEvent(1,70,329.16666666666663,1424951939640)\n" + + "CarEvent(0,55,373.61111111111114,1424951942641)\n" + + "CarEvent(1,80,519.4444444444443,1424951949644)\n" + + "CarEvent(1,85,586.111111111111,1424951952645)\n" + + "CarEvent(0,50,487.50000000000006,1424951951644)\n" + + "CarEvent(1,85,586.111111111111,1424951952645)\n" + + "CarEvent(0,60,590.2777777777778,1424951959647)\n" + + "CarEvent(1,85,586.111111111111,1424951952645)\n" + + "CarEvent(0,75,648.6111111111112,1424951962648)\n" + + "CarEvent(1,85,715.2777777777776,1424951958647)\n" + + "CarEvent(1,95,858.333333333333,1424951964649)\n" + + "CarEvent(0,80,670.8333333333334,1424951963648)\n" + + "CarEvent(1,95,858.333333333333,1424951964649)\n" + + "CarEvent(0,80,670.8333333333334,1424951963648)\n" + + "CarEvent(1,100,937.4999999999998,1424951967650)\n" + + "CarEvent(1,100,937.4999999999998,1424951967650)\n" + + "CarEvent(0,80,670.8333333333334,1424951963648)\n" + + "CarEvent(1,100,937.4999999999998,1424951967650)\n" + + "CarEvent(0,85,861.1111111111112,1424951972651)\n" + + "CarEvent(1,100,937.4999999999998,1424951967650)\n" + + "CarEvent(1,100,937.4999999999998,1424951967650)\n" + + "CarEvent(0,85,861.1111111111112,1424951972651)\n" + + "CarEvent(1,100,993.0555555555554,1424951969650)\n" + + "CarEvent(0,85,861.1111111111112,1424951972651)\n" + + "CarEvent(1,100,1048.611111111111,1424951971651)\n" + + "CarEvent(1,100,1130.5555555555557,1424951974652)\n" + + "CarEvent(0,90,1058.3333333333335,1424951981654)\n" + + "CarEvent(1,100,1158.3333333333335,1424951975652)\n" + + "CarEvent(0,95,1133.3333333333335,1424951984655)\n" + + "CarEvent(1,100,1240.277777777778,1424951978653)\n" + + "CarEvent(0,95,1133.3333333333335,1424951984655)\n" + + "CarEvent(1,100,1268.0555555555559,1424951979654)\n" + + "CarEvent(0,95,1133.3333333333335,1424951984655)\n" + + "CarEvent(1,100,1323.6111111111115,1424951981654)\n" + + "CarEvent(0,95,1133.3333333333335,1424951984655)\n" + + "CarEvent(1,100,1379.1666666666672,1424951983655)\n" + + "CarEvent(0,100,1358.3333333333335,1424951993659)\n" + + "CarEvent(1,100,1563.8888888888896,1424951990658)\n" + + "CarEvent(0,100,1358.3333333333335,1424951993659)\n" + + "CarEvent(1,100,1563.8888888888896,1424951990658)\n" + + "CarEvent(0,100,1358.3333333333335,1424951993659)\n" + + "CarEvent(1,100,1563.8888888888896,1424951990658)\n" + + "CarEvent(0,100,1358.3333333333335,1424951993659)\n" + + "CarEvent(0,100,1358.3333333333335,1424951993659)\n" + + "CarEvent(1,100,1669.4444444444453,1424951994659)\n" + + "CarEvent(0,100,1440.277777777778,1424951996660)\n" + + "CarEvent(1,90,1720.8333333333342,1424951996660)\n" + + "CarEvent(0,100,1468.0555555555559,1424951997660)\n" + + "CarEvent(1,95,1973.6111111111115,1424952007664)\n" + + "CarEvent(0,100,1522.2222222222226,1424951999661)\n" + + "CarEvent(0,100,1627.7777777777783,1424952003662)\n" + + "CarEvent(1,95,1973.6111111111115,1424952007664)\n" + + "CarEvent(0,100,1627.7777777777783,1424952003662)\n" + + "CarEvent(1,95,1973.6111111111115,1424952007664)\n" + + "CarEvent(0,100,1709.7222222222229,1424952006663)\n" + + "CarEvent(0,100,1737.5000000000007,1424952007664)\n" + + "CarEvent(1,95,1973.6111111111115,1424952007664)\n" + + "CarEvent(0,100,1791.6666666666674,1424952009664)\n" + + "CarEvent(1,95,2211.1111111111118,1424952017668)\n"; + + private TopSpeedWindowingExampleData() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java new file mode 100644 index 0000000..9b0b63c --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.wordcount; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * This example shows an implementation of WordCount without using the Tuple2 + * type, but a custom class. + * + * <p> + * Usage: <code>WordCount <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + * + * <p> + * This example shows how to: + * <ul> + * <li>use POJO data types, + * <li>write a simple Flink program, + * <li>write and use user-defined functions. + * </ul> + */ +public class PojoExample { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataStream<String> text = getTextDataStream(env); + + DataStream<Word> counts = + // split up the lines into Word objects + text.flatMap(new Tokenizer()) + // group by the field word and sum up the frequency + .keyBy("word").sum("frequency"); + + if (fileOutput) { + counts.writeAsText(outputPath); + } else { + counts.print(); + } + + // execute program + env.execute("WordCount Pojo Example"); + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + /** + * This is the POJO (Plain Old Java Object) that is being used for all the + * operations. As long as all fields are public or have a getter/setter, the + * system can handle them + */ + public static class Word { + + private String word; + private Integer frequency; + + public Word() { + } + + public Word(String word, int i) { + this.word = word; + this.frequency = i; + } + + public String getWord() { + return word; + } + + public void setWord(String word) { + this.word = word; + } + + public Integer getFrequency() { + return frequency; + } + + public void setFrequency(Integer frequency) { + this.frequency = frequency; + } + + @Override + public String toString() { + return "(" + word + "," + frequency + ")"; + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String, + * Integer>}). + */ + public static final class Tokenizer implements FlatMapFunction<String, Word> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<Word> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Word(token, 1)); + } + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: PojoExample <text path> <result path>"); + return false; + } + } else { + System.out.println("Executing PojoExample example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: PojoExample <text path> <result path>"); + } + return true; + } + + private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return env.fromElements(WordCountData.WORDS); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java new file mode 100644 index 0000000..a76671e --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.wordcount; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.examples.java.wordcount.util.WordCountData; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * Implements the "WordCount" program that computes a simple word occurrence + * histogram over text files in a streaming fashion. + * + * <p> + * The input is a plain text file with lines separated by newline characters. + * + * <p> + * Usage: <code>WordCount <text path> <result path></code><br> + * If no parameters are provided, the program is run with default data from + * {@link WordCountData}. + * + * <p> + * This example shows how to: + * <ul> + * <li>write a simple Flink Streaming program, + * <li>use tuple data types, + * <li>write and use user-defined functions. + * </ul> + * + */ +public class WordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if (!parseParameters(args)) { + return; + } + + // set up the execution environment + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataStream<String> text = getTextDataStream(env); + + DataStream<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .keyBy(0).sum(1); + + // emit result + if (fileOutput) { + counts.writeAsText(outputPath); + } else { + counts.print(); + } + + // execute program + env.execute("Streaming WordCount"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a + * user-defined FlatMapFunction. The function takes a line (String) and + * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String, + * Integer>}). + */ + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) + throws Exception { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if (args.length > 0) { + // parse input arguments + fileOutput = true; + if (args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount <text path> <result path>"); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount <text path> <result path>"); + } + return true; + } + + private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { + if (fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return env.fromElements(WordCountData.WORDS); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala new file mode 100644 index 0000000..42484e8 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.join + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.Stream._ +import scala.language.postfixOps +import scala.util.Random + +object WindowJoin { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + case class Grade(time: Long, name: String, grade: Int) + case class Salary(time: Long, name: String, salary: Int) + case class Person(name: String, grade: Int, salary: Int) + + def main(args: Array[String]) { + + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + //Create streams for grades and salaries by mapping the inputs to the corresponding objects + val grades = setGradesInput(env) + val salaries = setSalariesInput(env) + + //Join the two input streams by name on the last 2 seconds every second and create new + //Person objects containing both grade and salary + val joined = grades.join(salaries) + .where(_.name) + .equalTo(_.name) + .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) + .apply { (g, s) => Person(g.name, g.grade, s.salary) } + + if (fileOutput) { + joined.writeAsText(outputPath) + } else { + joined.print() + } + + env.execute("WindowJoin") + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + val names = Array("tom", "jerry", "alice", "bob", "john", "grace") + val gradeCount = 5 + val salaryMax = 10000 + val sleepInterval = 100 + + def gradeStream: Stream[(Long, String, Int)] = { + def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) = + { + if (x % sleepInterval == 0) Thread.sleep(sleepInterval) + (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount)) + } + range(1, 100).map(gradeMapper(names)) + } + + def salaryStream: Stream[(Long, String, Int)] = { + def salaryMapper(x: Int): (Long, String, Int) = + { + if (x % sleepInterval == 0) Thread.sleep(sleepInterval) + (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax)) + } + range(1, 100).map(salaryMapper) + } + + def parseMap(line : String): (Long, String, Int) = { + val record = line.substring(1, line.length - 1).split(",") + (record(0).toLong, record(1), record(2).toInt) + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private var fileInput: Boolean = false + private var fileOutput: Boolean = false + + private var gradesPath: String = null + private var salariesPath: String = null + private var outputPath: String = null + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 1) { + fileOutput = true + outputPath = args(0) + } + else if (args.length == 3) { + fileInput = true + fileOutput = true + gradesPath = args(0) + salariesPath = args(1) + outputPath = args(2) + } else { + System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> " + + "<input path 2> <result path>") + return false + } + } else { + System.out.println("Executing WindowJoin with generated data.") + System.out.println(" Provide parameter to write to file.") + System.out.println(" Usage: WindowJoin <result path>") + } + true + } + + private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = { + if (fileInput) { + env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3)) + } else { + env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3)) + } + } + + private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = { + if (fileInput) { + env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3)) + } + else { + env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala new file mode 100644 index 0000000..9ec17d4 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.socket + +import org.apache.flink.streaming.api.scala._ + +import scala.language.postfixOps + +/** + * This example shows an implementation of WordCount with data from a text socket. + * To run the example make sure that the service providing the text data is already up and running. + * + * To start an example socket text stream on your local machine run netcat from a command line, + * where the parameter specifies the port number: + * + * {{{ + * nc -lk 9999 + * }}} + * + * Usage: + * {{{ + * SocketTextStreamWordCount <hostname> <port> <output path> + * }}} + * + * This example shows how to: + * + * - use StreamExecutionEnvironment.socketTextStream + * - write a simple Flink Streaming program in scala. + * - write and use user-defined functions. + */ +object SocketTextStreamWordCount { + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Create streams for names and ages by mapping the inputs to the corresponding objects + val text = env.socketTextStream(hostName, port) + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .keyBy(0) + .sum(1) + + if (fileOutput) { + counts.writeAsText(outputPath, 1) + } else { + counts print + } + + env.execute("Scala SocketTextStreamWordCount Example") + } + + private def parseParameters(args: Array[String]): Boolean = { + if (args.length == 3) { + fileOutput = true + hostName = args(0) + port = args(1).toInt + outputPath = args(2) + } else if (args.length == 2) { + hostName = args(0) + port = args(1).toInt + } else { + System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]") + return false + } + true + } + + private var fileOutput: Boolean = false + private var hostName: String = null + private var port: Int = 0 + private var outputPath: String = null + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala new file mode 100644 index 0000000..f26f32c --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger + +import scala.Stream._ +import scala.math._ +import scala.language.postfixOps +import scala.util.Random + +/** + * An example of grouped stream windowing where different eviction and + * trigger policies can be used. A source fetches events from cars + * every 1 sec containing their id, their current speed (kmh), + * overall elapsed distance (m) and a timestamp. The streaming + * example triggers the top speed of each car every x meters elapsed + * for the last y seconds. + */ +object TopSpeedWindowing { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) + + val numOfCars = 2 + val evictionSec = 10 + val triggerMeters = 50d + + def main(args: Array[String]) { + if (!parseParameters(args)) { + return + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val cars = setCarsInput(env) + + val topSeed = cars + .assignAscendingTimestamps( _.time ) + .keyBy("carId") + .window(GlobalWindows.create) + .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS))) + .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] { + def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance + })) +// .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time)) +// .every(Delta.of[CarEvent](triggerMeters, +// (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0))) + .maxBy("speed") + + if (fileOutput) { + topSeed.writeAsText(outputPath) + } else { + topSeed.print + } + + env.execute("TopSpeedWindowing") + + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + def genCarStream(): Stream[CarEvent] = { + + def nextSpeed(carEvent : CarEvent) : CarEvent = + { + val next = + if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) + CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) + } + def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = + { + Thread.sleep(1000) + speeds.append(carStream(speeds.map(nextSpeed))) + } + carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) + } + + def parseMap(line : String): (Int, Int, Double, Long) = { + val record = line.substring(1, line.length - 1).split(",") + (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong) + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + var fileInput = false + var fileOutput = false + var inputPath : String = null + var outputPath : String = null + + def parseParameters(args: Array[String]): Boolean = { + if (args.length > 0) { + if (args.length == 2) { + fileInput = true + fileOutput = true + inputPath = args(0) + outputPath = args(1) + true + } else { + System.err.println("Usage: TopSpeedWindowing <input path> <output path>") + false + } + } else { + true + } + } + + private def setCarsInput(env: StreamExecutionEnvironment) : DataStream[CarEvent] = { + if (fileInput) { + env.readTextFile(inputPath).map(parseMap(_)).map(x => CarEvent(x._1, x._2, x._3, x._4)) + } else { + env.fromCollection(genCarStream()) + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java new file mode 100644 index 0000000..07d6766 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleJavaPrograms.iteration; + +import org.apache.flink.streaming.examples.iteration.IterateExample; +import org.apache.flink.streaming.examples.iteration.util.IterateExampleData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IterateExampleITCase extends StreamingProgramTestBase { + + + protected String inputPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(IterateExampleData.RESULTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + IterateExample.main(new String[]{inputPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java new file mode 100644 index 0000000..e657b67 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleJavaPrograms.join; + +import org.apache.flink.streaming.examples.join.WindowJoin; +import org.apache.flink.streaming.examples.join.util.WindowJoinData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class WindowJoinITCase extends StreamingProgramTestBase { + + protected String gradesPath; + protected String salariesPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); + salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + // since the two sides of the join might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. (bob, 2, 2015) + checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)"); + } + + @Override + protected void testProgram() throws Exception { + WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java new file mode 100644 index 0000000..83569dc --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleJavaPrograms.ml; + +import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton; +import org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class IncrementalLearningSkeletonITCase extends StreamingProgramTestBase { + + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, resultPath); + } + + @Override + protected void testProgram() throws Exception { + IncrementalLearningSkeleton.main(new String[]{resultPath}); + } +}