http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
new file mode 100644
index 0000000..b06d193
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/util/TwitterStreamData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.twitter.util;
+
+//example data looking like tweets, but not acquired from Twitter
+public class TwitterStreamData {
+       public static final String[] TEXTS = new String[] {
+                       "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 
1901\",\"id\":000000000000000000,\"id_str\":\"000000000000000000\",\"text\":\"Apache
 
Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache
 Flink\",\"screen_name\":\"Apache 
Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon
 Jan 1 00:00:00 +0000 
1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example1\",\"indices\":[0,0]},{\"text\":\"tweet1\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+                       "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 
1901\",\"id\":000000000000000001,\"id_str\":\"000000000000000000\",\"text\":\"Apache
 
Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache
 Flink\",\"screen_name\":\"Apache 
Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon
 Jan 1 00:00:00 +0000 
1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example2\",\"indices\":[0,0]},{\"text\":\"tweet2\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+                       "{\"created_at\":\"Mon Jan 1 00:00:00 +0000 
1901\",\"id\":000000000000000002,\"id_str\":\"000000000000000000\",\"text\":\"Apache
 
Flink\",\"source\":null,\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":0000000000,\"id_str\":\"0000000000\",\"name\":\"Apache
 Flink\",\"screen_name\":\"Apache 
Flink\",\"location\":\"Berlin\",\"protected\":false,\"verified\":false,\"followers_count\":999999,\"friends_count\":99999,\"listed_count\":999,\"favourites_count\":9999,\"statuses_count\":999,\"created_at\":\"Mon
 Jan 1 00:00:00 +0000 
1901\",\"utc_offset\":7200,\"time_zone\":\"Amsterdam\",\"geo_enabled\":false,\"lang\":\"en\",\"entities\":{\"hashtags\":[{\"text\":\"example3\",\"indices\":[0,0]},{\"text\":\"tweet3\",\"indices\":[0,0]}]},\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C6E2EE\",\"profile_background_
 
tile\":false,\"profile_link_color\":\"1F98C7\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"252429\",\"profile_text_color\":\"666666\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null}",
+       };
+
+       public static final String STREAMING_COUNTS_AS_TUPLES = "(apache,1)\n" 
+ "(apache,2)\n" + "(apache,3)\n" + "(flink,1)\n" + "(flink,2)\n" + 
"(flink,3)\n";
+
+       private TwitterStreamData() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
new file mode 100644
index 0000000..f08069b
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+@SuppressWarnings("serial")
+public class GroupedProcessingTimeWindowExample {
+       
+       public static void main(String[] args) throws Exception {
+               
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(4);
+               
+               DataStream<Tuple2<Long, Long>> stream = env
+                               .addSource(new 
RichParallelSourceFunction<Tuple2<Long, Long>>() {
+                                       
+                                       private volatile boolean running = true;
+                                       
+                                       @Override
+                                       public void 
run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
+                                               
+                                               final long startTime = 
System.currentTimeMillis();
+                                               
+                                               final long numElements = 
20000000;
+                                               final long numKeys = 10000;
+                                               long val = 1L;
+                                               long count = 0L;
+                                               
+                                               
+                                               while (running && count < 
numElements) {
+                                                       count++;
+                                                       ctx.collect(new 
Tuple2<>(val++, 1L));
+                                                       
+                                                       if (val > numKeys) {
+                                                               val = 1L;
+                                                       }
+                                               }
+
+                                               final long endTime = 
System.currentTimeMillis();
+                                               System.out.println("Took " + 
(endTime-startTime) + " msecs for " + numElements + " values");
+                                       }
+
+                                       @Override
+                                       public void cancel() {
+                                               running = false;
+                                       }
+                               });
+               
+               stream
+                       .keyBy(0)
+                       .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, 
MILLISECONDS))
+                       .reduce(new SummingReducer())
+
+                       // alternative: use a apply function which does not 
pre-aggregate
+//                     .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, 
Long>())
+//                     .window(Time.of(2500, MILLISECONDS), Time.of(500, 
MILLISECONDS))
+//                     .apply(new SummingWindowFunction())
+                               
+                       .addSink(new SinkFunction<Tuple2<Long, Long>>() {
+                               @Override
+                               public void invoke(Tuple2<Long, Long> value) {
+                               }
+                       });
+               
+               env.execute();
+       }
+       
+       public static class FirstFieldKeyExtractor<Type extends Tuple, Key> 
implements KeySelector<Type, Key> {
+               
+               @Override
+               @SuppressWarnings("unchecked")
+               public Key getKey(Type value) {
+                       return (Key) value.getField(0);
+               }
+       }
+
+       public static class SummingWindowFunction implements 
WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
+
+               @Override
+               public void apply(Long key, Window window, 
Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
+                       long sum = 0L;
+                       for (Tuple2<Long, Long> value : values) {
+                               sum += value.f1;
+                       }
+
+                       out.collect(new Tuple2<>(key, sum));
+               }
+       }
+
+       public static class SummingReducer implements 
ReduceFunction<Tuple2<Long, Long>> {
+
+               @Override
+               public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, 
Tuple2<Long, Long> value2) {
+                       return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
new file mode 100644
index 0000000..035727a
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SessionWindowing {
+
+       @SuppressWarnings("serial")
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(2);
+
+               final List<Tuple3<String, Long, Integer>> input = new 
ArrayList<>();
+
+               input.add(new Tuple3<>("a", 1L, 1));
+               input.add(new Tuple3<>("b", 1L, 1));
+               input.add(new Tuple3<>("b", 3L, 1));
+               input.add(new Tuple3<>("b", 5L, 1));
+               input.add(new Tuple3<>("c", 6L, 1));
+               // We expect to detect the session "a" earlier than this point 
(the old
+               // functionality can only detect here when the next starts)
+               input.add(new Tuple3<>("a", 10L, 1));
+               // We expect to detect session "b" and "c" at this point as well
+               input.add(new Tuple3<>("c", 11L, 1));
+
+               DataStream<Tuple3<String, Long, Integer>> source = env
+                               .addSource(new 
EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void 
run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
+                                               for (Tuple3<String, Long, 
Integer> value : input) {
+                                                       
ctx.collectWithTimestamp(value, value.f1);
+                                                       ctx.emitWatermark(new 
Watermark(value.f1 - 1));
+                                                       if (!fileOutput) {
+                                                               
System.out.println("Collected: " + value);
+                                                       }
+                                               }
+                                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                                       }
+
+                                       @Override
+                                       public void cancel() {
+                                       }
+                               });
+
+               // We create sessions for each id with max timeout of 3 time 
units
+               DataStream<Tuple3<String, Long, Integer>> aggregated = source
+                               .keyBy(0)
+                               .window(GlobalWindows.create())
+                               .trigger(new SessionTrigger(3L))
+                               .sum(2);
+
+               if (fileOutput) {
+                       aggregated.writeAsText(outputPath);
+               } else {
+                       aggregated.print();
+               }
+
+               env.execute();
+       }
+
+       private static class SessionTrigger implements Trigger<Tuple3<String, 
Long, Integer>, GlobalWindow> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final Long sessionTimeout;
+
+               public SessionTrigger(Long sessionTimeout) {
+                       this.sessionTimeout = sessionTimeout;
+
+               }
+
+               @Override
+               public TriggerResult onElement(Tuple3<String, Long, Integer> 
element, long timestamp, GlobalWindow window, TriggerContext ctx) throws 
Exception {
+
+                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       Long lastSeen = lastSeenState.value();
+
+                       Long timeSinceLastEvent = timestamp - lastSeen;
+
+                       // Update the last seen event time
+                       lastSeenState.update(timestamp);
+
+                       ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
+
+                       if (timeSinceLastEvent > sessionTimeout) {
+                               return TriggerResult.FIRE_AND_PURGE;
+                       } else {
+                               return TriggerResult.CONTINUE;
+                       }
+               }
+
+               @Override
+               public TriggerResult onEventTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
+                       OperatorState<Long> lastSeenState = 
ctx.getKeyValueState("last-seen", 1L);
+                       Long lastSeen = lastSeenState.value();
+
+                       if (time - lastSeen >= sessionTimeout) {
+                               return TriggerResult.FIRE_AND_PURGE;
+                       }
+                       return TriggerResult.CONTINUE;
+               }
+
+               @Override
+               public TriggerResult onProcessingTime(long time, GlobalWindow 
window, TriggerContext ctx) throws Exception {
+                       return TriggerResult.CONTINUE;
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       if (args.length == 1) {
+                               fileOutput = true;
+                               outputPath = args[0];
+                       } else {
+                               System.err.println("Usage: SessionWindowing 
<result path>");
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
new file mode 100644
index 0000000..30eda67
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An example of grouped stream windowing where different eviction and trigger
+ * policies can be used. A source fetches events from cars every 1 sec
+ * containing their id, their current speed (kmh), overall elapsed distance (m)
+ * and a timestamp. The streaming example triggers the top speed of each car
+ * every x meters elapsed for the last y seconds.
+ */
+public class TopSpeedWindowing {
+
+       private static final int NUM_CAR_EVENTS = 100;
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               @SuppressWarnings({"rawtypes", "serial"})
+               DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+
+               if (fileInput) {
+                       carData = env.readTextFile(inputPath).map(new 
ParseCarData());
+               } else {
+                       int numOfCars = 2;
+                       carData = env.addSource(CarSource.create(numOfCars));
+               }
+
+               int evictionSec = 10;
+               double triggerMeters = 50;
+               DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = 
carData
+                               .assignTimestamps(new CarTimestamp())
+                               .keyBy(0)
+                               .window(GlobalWindows.create())
+                               .evictor(TimeEvictor.of(Time.of(evictionSec, 
TimeUnit.SECONDS)))
+                               .trigger(DeltaTrigger.of(triggerMeters,
+                                               new 
DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                                       @Override
+                                                       public double getDelta(
+                                                                       
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
+                                                                       
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
+                                                               return 
newDataPoint.f2 - oldDataPoint.f2;
+                                                       }
+                                               }))
+                               .maxBy(1);
+
+               if (fileOutput) {
+                       topSpeeds.print();
+                       topSpeeds.writeAsText(outputPath);
+               } else {
+                       topSpeeds.print();
+               }
+
+               env.execute("CarTopSpeedWindowingExample");
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       private static class CarSource implements 
SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+               private static final long serialVersionUID = 1L;
+               private Integer[] speeds;
+               private Double[] distances;
+
+               private Random rand = new Random();
+
+               private volatile boolean isRunning = true;
+               private int counter;
+
+               private CarSource(int numOfCars) {
+                       speeds = new Integer[numOfCars];
+                       distances = new Double[numOfCars];
+                       Arrays.fill(speeds, 50);
+                       Arrays.fill(distances, 0d);
+               }
+
+               public static CarSource create(int cars) {
+                       return new CarSource(cars);
+               }
+
+               @Override
+               public void run(SourceContext<Tuple4<Integer, Integer, Double, 
Long>> ctx) throws Exception {
+
+                       while (isRunning && counter < NUM_CAR_EVENTS) {
+                               Thread.sleep(100);
+                               for (int carId = 0; carId < speeds.length; 
carId++) {
+                                       if (rand.nextBoolean()) {
+                                               speeds[carId] = Math.min(100, 
speeds[carId] + 5);
+                                       } else {
+                                               speeds[carId] = Math.max(0, 
speeds[carId] - 5);
+                                       }
+                                       distances[carId] += speeds[carId] / 
3.6d;
+                                       Tuple4<Integer, Integer, Double, Long> 
record = new Tuple4<>(carId,
+                                                       speeds[carId], 
distances[carId], System.currentTimeMillis());
+                                       ctx.collect(record);
+                                       counter++;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       isRunning = false;
+               }
+       }
+
+       private static class ParseCarData extends
+                       RichMapFunction<String, Tuple4<Integer, Integer, 
Double, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple4<Integer, Integer, Double, Long> map(String 
record) {
+                       String rawData = record.substring(1, record.length() - 
1);
+                       String[] data = rawData.split(",");
+                       return new Tuple4<>(Integer.valueOf(data[0]), 
Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+               }
+       }
+
+       private static class CarTimestamp implements 
TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public long extractTimestamp(Tuple4<Integer, Integer, Double, 
Long> element,
+                               long currentTimestamp) {
+                       return element.f3;
+               }
+
+               @Override
+               public long extractWatermark(Tuple4<Integer, Integer, Double, 
Long> element,
+                               long currentTimestamp) {
+                       return element.f3 - 1;
+               }
+
+               @Override
+               public long getCurrentWatermark() {
+                       return Long.MIN_VALUE;
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileInput = false;
+       private static boolean fileOutput = false;
+       private static String inputPath;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       if (args.length == 2) {
+                               fileInput = true;
+                               fileOutput = true;
+                               inputPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: 
TopSpeedWindowingExample <input path> <output path>");
+                               return false;
+                       }
+               }
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
new file mode 100644
index 0000000..f3d57bf
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount;
+
+/**
+ * Implements a windowed version of the streaming "WordCount" program.
+ *
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
+ * <li>use basic windowing abstractions.
+ * </ul>
+ *
+ */
+public class WindowWordCount {
+
+       // window parameters with default values
+       private static int windowSize = 250;
+       private static int slideSize = 150;
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataStream<String> text = getTextDataStream(env);
+
+               DataStream<Tuple2<String, Integer>> counts =
+               // split up the lines in pairs (2-tuples) containing: (word,1)
+               text.flatMap(new WordCount.Tokenizer())
+                               // create windows of windowSize records slided 
every slideSize records
+                               .keyBy(0)
+                               .countWindow(windowSize, slideSize)
+                               // group by the tuple field "0" and sum up 
tuple field "1"
+                               .sum(1);
+
+               // emit result
+               if (fileOutput) {
+                       counts.writeAsText(outputPath);
+               } else {
+                       counts.print();
+               }
+
+               // execute program
+               env.execute("WindowWordCount");
+       }
+
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if (args.length >= 2 && args.length <= 4) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                               if (args.length >= 3){
+                                       windowSize = Integer.parseInt(args[2]);
+
+                                       // if no slide size is specified use the
+                                       slideSize = args.length == 3 ? 
windowSize : Integer.parseInt(args[2]);
+                               }
+                       } else {
+                               System.err.println("Usage: WindowWordCount 
<text path> <result path> [<window size>] [<slide size>]");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing WindowWordCount example 
with built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from a file.");
+                       System.out.println("  Usage: WindowWordCount <text 
path> <result path> [<window size>] [<slide size>]");
+               }
+               return true;
+       }
+
+       private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               } else {
+                       // get default test text data
+                       return env.fromElements(WordCountData.WORDS);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
new file mode 100644
index 0000000..c1a99a8
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/SessionWindowingData.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class SessionWindowingData {
+
+       public static final String EXPECTED = "(a,1,1)\n" + "(c,6,1)\n" + 
"(c,11,1)\n" + "(b,1,3)\n" +
+                       "(a,10,1)";
+
+       private SessionWindowingData() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
new file mode 100644
index 0000000..bf63695
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+public class TopSpeedWindowingExampleData {
+
+       public static final String CAR_DATA =
+                                       
"(0,55,15.277777777777777,1424951918630)\n" + "(1,45,12.5,1424951918632)\n" +
+                                       
"(0,50,29.166666666666664,1424951919632)\n" + 
"(1,50,26.38888888888889,1424951919632)\n" +
+                                       
"(0,55,44.44444444444444,1424951920633)\n" + 
"(1,45,38.888888888888886,1424951920633)\n" +
+                                       
"(0,50,58.33333333333333,1424951921634)\n" + "(1,40,50.0,1424951921634)\n" +
+                                       
"(0,55,73.6111111111111,1424951922634)\n" + 
"(1,35,59.72222222222222,1424951922634)\n" +
+                                       
"(0,60,90.27777777777777,1424951923634)\n" + 
"(1,40,70.83333333333333,1424951923634)\n" +
+                                       
"(0,65,108.33333333333333,1424951924635)\n" + 
"(1,35,80.55555555555554,1424951924635)\n" +
+                                       "(0,60,125.0,1424951925635)\n" + 
"(1,40,91.66666666666666,1424951925635)\n" +
+                                       
"(0,55,140.27777777777777,1424951926635)\n" + 
"(1,45,104.16666666666666,1424951926636)\n" +
+                                       
"(0,60,156.94444444444443,1424951927636)\n" + 
"(1,50,118.05555555555554,1424951927636)\n" +
+                                       
"(0,55,172.2222222222222,1424951928636)\n" + 
"(1,45,130.55555555555554,1424951928636)\n" +
+                                       
"(0,50,186.1111111111111,1424951929636)\n" + 
"(1,50,144.44444444444443,1424951929637)\n" +
+                                       
"(0,55,201.38888888888886,1424951930637)\n" + 
"(1,55,159.7222222222222,1424951930637)\n" +
+                                       
"(0,60,218.05555555555551,1424951931637)\n" + 
"(1,60,176.38888888888886,1424951931637)\n" +
+                                       
"(0,55,233.3333333333333,1424951932637)\n" + 
"(1,65,194.4444444444444,1424951932638)\n" +
+                                       
"(0,50,247.22222222222217,1424951933638)\n" + 
"(1,70,213.88888888888886,1424951933638)\n" +
+                                       
"(0,45,259.7222222222222,1424951934638)\n" + 
"(1,65,231.9444444444444,1424951934638)\n" +
+                                       
"(0,50,273.6111111111111,1424951935638)\n" + 
"(1,70,251.38888888888886,1424951935639)\n" +
+                                       
"(0,55,288.88888888888886,1424951936639)\n" + 
"(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"(0,50,302.77777777777777,1424951937639)\n" + 
"(1,70,291.66666666666663,1424951937639)\n" +
+                                       
"(0,45,315.27777777777777,1424951938640)\n" + 
"(1,65,309.7222222222222,1424951938640)\n" +
+                                       
"(0,50,329.1666666666667,1424951939640)\n" + 
"(1,70,329.16666666666663,1424951939640)\n" +
+                                       
"(0,55,344.44444444444446,1424951940640)\n" + 
"(1,65,347.2222222222222,1424951940640)\n" +
+                                       
"(0,50,358.33333333333337,1424951941641)\n" + 
"(1,70,366.66666666666663,1424951941641)\n" +
+                                       
"(0,55,373.61111111111114,1424951942641)\n" + 
"(1,65,384.7222222222222,1424951942641)\n" +
+                                       
"(0,50,387.50000000000006,1424951943641)\n" + 
"(1,70,404.16666666666663,1424951943641)\n" +
+                                       
"(0,45,400.00000000000006,1424951944642)\n" + 
"(1,65,422.2222222222222,1424951944642)\n" +
+                                       
"(0,50,413.88888888888897,1424951945642)\n" + 
"(1,60,438.88888888888886,1424951945642)\n" +
+                                       
"(0,45,426.38888888888897,1424951946642)\n" + 
"(1,65,456.9444444444444,1424951946642)\n" +
+                                       
"(0,40,437.50000000000006,1424951947643)\n" + 
"(1,70,476.38888888888886,1424951947643)\n" +
+                                       
"(0,45,450.00000000000006,1424951948643)\n" + 
"(1,75,497.2222222222222,1424951948643)\n" +
+                                       
"(0,40,461.11111111111114,1424951949643)\n" + 
"(1,80,519.4444444444443,1424951949644)\n" +
+                                       
"(0,45,473.61111111111114,1424951950644)\n" + 
"(1,75,540.2777777777777,1424951950644)\n" +
+                                       
"(0,50,487.50000000000006,1424951951644)\n" + 
"(1,80,562.4999999999999,1424951951644)\n" +
+                                       
"(0,45,500.00000000000006,1424951952644)\n" + 
"(1,85,586.111111111111,1424951952645)\n" +
+                                       
"(0,40,511.11111111111114,1424951953645)\n" + 
"(1,80,608.3333333333331,1424951953645)\n" +
+                                       
"(0,35,520.8333333333334,1424951954645)\n" + 
"(1,75,629.1666666666665,1424951954645)\n" +
+                                       
"(0,40,531.9444444444445,1424951955645)\n" + 
"(1,70,648.611111111111,1424951955646)\n" +
+                                       
"(0,45,544.4444444444445,1424951956646)\n" + 
"(1,75,669.4444444444443,1424951956646)\n" +
+                                       
"(0,50,558.3333333333334,1424951957646)\n" + 
"(1,80,691.6666666666665,1424951957646)\n" +
+                                       
"(0,55,573.6111111111112,1424951958646)\n" + 
"(1,85,715.2777777777776,1424951958647)\n" +
+                                       
"(0,60,590.2777777777778,1424951959647)\n" + 
"(1,80,737.4999999999998,1424951959647)\n" +
+                                       
"(0,65,608.3333333333334,1424951960647)\n" + 
"(1,85,761.1111111111109,1424951960647)\n" +
+                                       
"(0,70,627.7777777777778,1424951961647)\n" + 
"(1,80,783.333333333333,1424951961648)\n" +
+                                       
"(0,75,648.6111111111112,1424951962648)\n" + 
"(1,85,806.9444444444441,1424951962648)\n" +
+                                       
"(0,80,670.8333333333334,1424951963648)\n" + 
"(1,90,831.9444444444441,1424951963648)\n" +
+                                       
"(0,75,691.6666666666667,1424951964649)\n" + 
"(1,95,858.333333333333,1424951964649)\n" +
+                                       
"(0,70,711.1111111111112,1424951965649)\n" + 
"(1,90,883.333333333333,1424951965649)\n" +
+                                       
"(0,75,731.9444444444446,1424951966649)\n" + 
"(1,95,909.722222222222,1424951966649)\n" +
+                                       
"(0,70,751.388888888889,1424951967649)\n" + 
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(0,75,772.2222222222224,1424951968650)\n" + 
"(1,100,965.2777777777776,1424951968650)\n" +
+                                       
"(0,80,794.4444444444446,1424951969650)\n" + 
"(1,100,993.0555555555554,1424951969650)\n" +
+                                       
"(0,75,815.2777777777779,1424951970651)\n" + 
"(1,100,1020.8333333333333,1424951970651)\n" +
+                                       
"(0,80,837.5000000000001,1424951971651)\n" + 
"(1,100,1048.611111111111,1424951971651)\n" +
+                                       
"(0,85,861.1111111111112,1424951972651)\n" + 
"(1,100,1076.388888888889,1424951972651)\n" +
+                                       
"(0,80,883.3333333333334,1424951973652)\n" + 
"(1,95,1102.7777777777778,1424951973652)\n" +
+                                       
"(0,75,904.1666666666667,1424951974652)\n" + 
"(1,100,1130.5555555555557,1424951974652)\n" +
+                                       
"(0,70,923.6111111111112,1424951975652)\n" + 
"(1,100,1158.3333333333335,1424951975652)\n" +
+                                       
"(0,75,944.4444444444446,1424951976653)\n" + 
"(1,100,1186.1111111111113,1424951976653)\n" +
+                                       
"(0,80,966.6666666666667,1424951977653)\n" + 
"(1,95,1212.5000000000002,1424951977653)\n" +
+                                       
"(0,75,987.5000000000001,1424951978653)\n" + 
"(1,100,1240.277777777778,1424951978653)\n" +
+                                       
"(0,80,1009.7222222222223,1424951979654)\n" + 
"(1,100,1268.0555555555559,1424951979654)\n" +
+                                       
"(0,85,1033.3333333333335,1424951980654)\n" + 
"(1,100,1295.8333333333337,1424951980654)\n" +
+                                       
"(0,90,1058.3333333333335,1424951981654)\n" + 
"(1,100,1323.6111111111115,1424951981654)\n" +
+                                       
"(0,85,1081.9444444444446,1424951982655)\n" + 
"(1,100,1351.3888888888894,1424951982655)\n" +
+                                       
"(0,90,1106.9444444444446,1424951983655)\n" + 
"(1,100,1379.1666666666672,1424951983655)\n" +
+                                       
"(0,95,1133.3333333333335,1424951984655)\n" + 
"(1,100,1406.944444444445,1424951984656)\n" +
+                                       
"(0,90,1158.3333333333335,1424951985656)\n" + 
"(1,95,1433.333333333334,1424951985656)\n" +
+                                       
"(0,95,1184.7222222222224,1424951986656)\n" + 
"(1,90,1458.333333333334,1424951986656)\n" +
+                                       
"(0,90,1209.7222222222224,1424951987656)\n" + 
"(1,95,1484.7222222222229,1424951987657)\n" +
+                                       
"(0,85,1233.3333333333335,1424951988657)\n" + 
"(1,90,1509.7222222222229,1424951988657)\n" +
+                                       
"(0,80,1255.5555555555557,1424951989657)\n" + 
"(1,95,1536.1111111111118,1424951989657)\n" +
+                                       
"(0,85,1279.1666666666667,1424951990657)\n" + 
"(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"(0,90,1304.1666666666667,1424951991658)\n" + 
"(1,95,1590.2777777777785,1424951991658)\n" +
+                                       
"(0,95,1330.5555555555557,1424951992658)\n" + 
"(1,90,1615.2777777777785,1424951992658)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" + 
"(1,95,1641.6666666666674,1424951993659)\n" +
+                                       
"(0,100,1386.1111111111113,1424951994659)\n" + 
"(1,100,1669.4444444444453,1424951994659)\n" +
+                                       
"(0,95,1412.5000000000002,1424951995659)\n" + 
"(1,95,1695.8333333333342,1424951995660)\n" +
+                                       
"(0,100,1440.277777777778,1424951996660)\n" + 
"(1,90,1720.8333333333342,1424951996660)\n" +
+                                       
"(0,100,1468.0555555555559,1424951997660)\n" + 
"(1,85,1744.4444444444453,1424951997660)\n" +
+                                       
"(0,95,1494.4444444444448,1424951998660)\n" + 
"(1,80,1766.6666666666674,1424951998661)\n" +
+                                       
"(0,100,1522.2222222222226,1424951999661)\n" + 
"(1,75,1787.5000000000007,1424951999661)\n" +
+                                       
"(0,95,1548.6111111111115,1424952000661)\n" + 
"(1,80,1809.7222222222229,1424952000661)\n" +
+                                       
"(0,90,1573.6111111111115,1424952001662)\n" + 
"(1,75,1830.555555555556,1424952001662)\n" +
+                                       
"(0,95,1600.0000000000005,1424952002662)\n" + 
"(1,80,1852.7777777777783,1424952002662)\n" +
+                                       
"(0,100,1627.7777777777783,1424952003662)\n" + 
"(1,85,1876.3888888888894,1424952003662)\n" +
+                                       
"(0,100,1655.555555555556,1424952004663)\n" + 
"(1,80,1898.6111111111115,1424952004663)\n" +
+                                       
"(0,95,1681.944444444445,1424952005663)\n" + 
"(1,85,1922.2222222222226,1424952005663)\n" +
+                                       
"(0,100,1709.7222222222229,1424952006663)\n" + 
"(1,90,1947.2222222222226,1424952006664)\n" +
+                                       
"(0,100,1737.5000000000007,1424952007664)\n" + 
"(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"(0,95,1763.8888888888896,1424952008664)\n" + 
"(1,90,1998.6111111111115,1424952008664)\n" +
+                                       
"(0,100,1791.6666666666674,1424952009664)\n" + 
"(1,85,2022.2222222222226,1424952009665)\n" +
+                                       
"(0,95,1818.0555555555563,1424952010665)\n" + 
"(1,80,2044.4444444444448,1424952010665)\n" +
+                                       
"(0,90,1843.0555555555563,1424952011665)\n" + 
"(1,75,2065.2777777777783,1424952011665)\n" +
+                                       
"(0,95,1869.4444444444453,1424952012666)\n" + 
"(1,80,2087.5000000000005,1424952012666)\n" +
+                                       
"(0,100,1897.222222222223,1424952013666)\n" + 
"(1,85,2111.1111111111118,1424952013666)\n" +
+                                       
"(0,95,1923.611111111112,1424952014666)\n" + 
"(1,90,2136.1111111111118,1424952014666)\n" +
+                                       
"(0,100,1951.3888888888898,1424952015667)\n" + 
"(1,85,2159.722222222223,1424952015667)\n" +
+                                       
"(0,95,1977.7777777777787,1424952016667)\n" + 
"(1,90,2184.722222222223,1424952016667)\n" +
+                                       
"(0,100,2005.5555555555566,1424952017667)\n" + 
"(1,95,2211.1111111111118,1424952017668)";
+
+       public static final String TOP_SPEEDS =
+                       "(0,55,15.277777777777777,1424951918630)\n" +
+                                       
"(1,50,26.38888888888889,1424951919632)\n" +
+                                       
"(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"(1,50,26.38888888888889,1424951919632)\n" +
+                                       
"(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"(1,65,194.4444444444444,1424951932638)\n" +
+                                       
"(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"(1,70,213.88888888888886,1424951933638)\n" +
+                                       
"(0,60,218.05555555555551,1424951931637)\n" +
+                                       
"(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"(0,55,233.3333333333333,1424951932637)\n" +
+                                       
"(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"(0,55,288.88888888888886,1424951936639)\n" +
+                                       
"(1,70,329.16666666666663,1424951939640)\n" +
+                                       
"(0,55,373.61111111111114,1424951942641)\n" +
+                                       
"(1,80,519.4444444444443,1424951949644)\n" +
+                                       
"(1,85,586.111111111111,1424951952645)\n" +
+                                       
"(0,50,487.50000000000006,1424951951644)\n" +
+                                       
"(1,85,586.111111111111,1424951952645)\n" +
+                                       
"(0,60,590.2777777777778,1424951959647)\n" +
+                                       
"(1,85,586.111111111111,1424951952645)\n" +
+                                       
"(0,75,648.6111111111112,1424951962648)\n" +
+                                       
"(1,85,715.2777777777776,1424951958647)\n" +
+                                       
"(1,95,858.333333333333,1424951964649)\n" +
+                                       
"(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"(1,95,858.333333333333,1424951964649)\n" +
+                                       
"(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"(1,100,993.0555555555554,1424951969650)\n" +
+                                       
"(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"(1,100,1048.611111111111,1424951971651)\n" +
+                                       
"(1,100,1130.5555555555557,1424951974652)\n" +
+                                       
"(0,90,1058.3333333333335,1424951981654)\n" +
+                                       
"(1,100,1158.3333333333335,1424951975652)\n" +
+                                       
"(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"(1,100,1240.277777777778,1424951978653)\n" +
+                                       
"(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"(1,100,1268.0555555555559,1424951979654)\n" +
+                                       
"(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"(1,100,1323.6111111111115,1424951981654)\n" +
+                                       
"(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"(1,100,1379.1666666666672,1424951983655)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"(1,100,1669.4444444444453,1424951994659)\n" +
+                                       
"(0,100,1440.277777777778,1424951996660)\n" +
+                                       
"(1,90,1720.8333333333342,1424951996660)\n" +
+                                       
"(0,100,1468.0555555555559,1424951997660)\n" +
+                                       
"(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"(0,100,1522.2222222222226,1424951999661)\n" +
+                                       
"(0,100,1627.7777777777783,1424952003662)\n" +
+                                       
"(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"(0,100,1627.7777777777783,1424952003662)\n" +
+                                       
"(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"(0,100,1709.7222222222229,1424952006663)\n" +
+                                       
"(0,100,1737.5000000000007,1424952007664)\n" +
+                                       
"(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"(0,100,1791.6666666666674,1424952009664)\n" +
+                                       
"(1,95,2211.1111111111118,1424952017668)\n";
+
+       public static final String TOP_CASE_CLASS_SPEEDS =
+                       "CarEvent(0,55,15.277777777777777,1424951918630)\n" +
+                                       
"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
+                                       
"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"CarEvent(1,50,26.38888888888889,1424951919632)\n" +
+                                       
"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"CarEvent(1,65,194.4444444444444,1424951932638)\n" +
+                                       
"CarEvent(0,65,108.33333333333333,1424951924635)\n" +
+                                       
"CarEvent(1,70,213.88888888888886,1424951933638)\n" +
+                                       
"CarEvent(0,60,218.05555555555551,1424951931637)\n" +
+                                       
"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"CarEvent(0,55,233.3333333333333,1424951932637)\n" +
+                                       
"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"CarEvent(1,75,272.2222222222222,1424951936639)\n" +
+                                       
"CarEvent(0,55,288.88888888888886,1424951936639)\n" +
+                                       
"CarEvent(1,70,329.16666666666663,1424951939640)\n" +
+                                       
"CarEvent(0,55,373.61111111111114,1424951942641)\n" +
+                                       
"CarEvent(1,80,519.4444444444443,1424951949644)\n" +
+                                       
"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+                                       
"CarEvent(0,50,487.50000000000006,1424951951644)\n" +
+                                       
"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+                                       
"CarEvent(0,60,590.2777777777778,1424951959647)\n" +
+                                       
"CarEvent(1,85,586.111111111111,1424951952645)\n" +
+                                       
"CarEvent(0,75,648.6111111111112,1424951962648)\n" +
+                                       
"CarEvent(1,85,715.2777777777776,1424951958647)\n" +
+                                       
"CarEvent(1,95,858.333333333333,1424951964649)\n" +
+                                       
"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"CarEvent(1,95,858.333333333333,1424951964649)\n" +
+                                       
"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"CarEvent(0,80,670.8333333333334,1424951963648)\n" +
+                                       
"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"CarEvent(1,100,937.4999999999998,1424951967650)\n" +
+                                       
"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"CarEvent(1,100,993.0555555555554,1424951969650)\n" +
+                                       
"CarEvent(0,85,861.1111111111112,1424951972651)\n" +
+                                       
"CarEvent(1,100,1048.611111111111,1424951971651)\n" +
+                                       
"CarEvent(1,100,1130.5555555555557,1424951974652)\n" +
+                                       
"CarEvent(0,90,1058.3333333333335,1424951981654)\n" +
+                                       
"CarEvent(1,100,1158.3333333333335,1424951975652)\n" +
+                                       
"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"CarEvent(1,100,1240.277777777778,1424951978653)\n" +
+                                       
"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"CarEvent(1,100,1268.0555555555559,1424951979654)\n" +
+                                       
"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"CarEvent(1,100,1323.6111111111115,1424951981654)\n" +
+                                       
"CarEvent(0,95,1133.3333333333335,1424951984655)\n" +
+                                       
"CarEvent(1,100,1379.1666666666672,1424951983655)\n" +
+                                       
"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"CarEvent(1,100,1563.8888888888896,1424951990658)\n" +
+                                       
"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"CarEvent(0,100,1358.3333333333335,1424951993659)\n" +
+                                       
"CarEvent(1,100,1669.4444444444453,1424951994659)\n" +
+                                       
"CarEvent(0,100,1440.277777777778,1424951996660)\n" +
+                                       
"CarEvent(1,90,1720.8333333333342,1424951996660)\n" +
+                                       
"CarEvent(0,100,1468.0555555555559,1424951997660)\n" +
+                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"CarEvent(0,100,1522.2222222222226,1424951999661)\n" +
+                                       
"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
+                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"CarEvent(0,100,1627.7777777777783,1424952003662)\n" +
+                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
+                                       
"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
+                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
+                                       
"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
+                                       
"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
+
+       private TopSpeedWindowingExampleData() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
new file mode 100644
index 0000000..9b0b63c
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/PojoExample.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example shows an implementation of WordCount without using the Tuple2
+ * type, but a custom class.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use POJO data types,
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions. 
+ * </ul>
+ */
+public class PojoExample {
+       
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataStream<String> text = getTextDataStream(env);
+
+               DataStream<Word> counts =
+               // split up the lines into Word objects
+               text.flatMap(new Tokenizer())
+               // group by the field word and sum up the frequency
+                               .keyBy("word").sum("frequency");
+
+               if (fileOutput) {
+                       counts.writeAsText(outputPath);
+               } else {
+                       counts.print();
+               }
+
+               // execute program
+               env.execute("WordCount Pojo Example");
+       }
+
+       // 
*************************************************************************
+       // DATA TYPES
+       // 
*************************************************************************
+
+       /**
+        * This is the POJO (Plain Old Java Object) that is being used for all 
the
+        * operations. As long as all fields are public or have a 
getter/setter, the
+        * system can handle them
+        */
+       public static class Word {
+
+               private String word;
+               private Integer frequency;
+
+               public Word() {
+               }
+
+               public Word(String word, int i) {
+                       this.word = word;
+                       this.frequency = i;
+               }
+
+               public String getWord() {
+                       return word;
+               }
+
+               public void setWord(String word) {
+                       this.word = word;
+               }
+
+               public Integer getFrequency() {
+                       return frequency;
+               }
+
+               public void setFrequency(Integer frequency) {
+                       this.frequency = frequency;
+               }
+
+               @Override
+               public String toString() {
+                       return "(" + word + "," + frequency + ")";
+               }
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       /**
+        * Implements the string tokenizer that splits sentences into words as a
+        * user-defined FlatMapFunction. The function takes a line (String) and
+        * splits it into multiple pairs in the form of "(word,1)" ({@code 
Tuple2<String,
+        * Integer>}).
+        */
+       public static final class Tokenizer implements FlatMapFunction<String, 
Word> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Word> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Word(token, 1));
+                               }
+                       }
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if (args.length == 2) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: PojoExample <text 
path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing PojoExample example with 
built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from a file.");
+                       System.out.println("  Usage: PojoExample <text path> 
<result path>");
+               }
+               return true;
+       }
+
+       private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               } else {
+                       // get default test text data
+                       return env.fromElements(WordCountData.WORDS);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
new file mode 100644
index 0000000..a76671e
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program,
+ * <li>use tuple data types,
+ * <li>write and use user-defined functions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               DataStream<String> text = getTextDataStream(env);
+
+               DataStream<Tuple2<String, Integer>> counts =
+               // split up the lines in pairs (2-tuples) containing: (word,1)
+               text.flatMap(new Tokenizer())
+               // group by the tuple field "0" and sum up tuple field "1"
+                               .keyBy(0).sum(1);
+
+               // emit result
+               if (fileOutput) {
+                       counts.writeAsText(outputPath);
+               } else {
+                       counts.print();
+               }
+
+               // execute program
+               env.execute("Streaming WordCount");
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       /**
+        * Implements the string tokenizer that splits sentences into words as a
+        * user-defined FlatMapFunction. The function takes a line (String) and
+        * splits it into multiple pairs in the form of "(word,1)" ({@code 
Tuple2<String,
+        * Integer>}).
+        */
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out)
+                               throws Exception {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if (args.length == 2) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: WordCount <text 
path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing WordCount example with 
built-in default data.");
+                       System.out.println("  Provide parameters to read input 
data from a file.");
+                       System.out.println("  Usage: WordCount <text path> 
<result path>");
+               }
+               return true;
+       }
+
+       private static DataStream<String> 
getTextDataStream(StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               } else {
+                       // get default test text data
+                       return env.fromElements(WordCountData.WORDS);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
new file mode 100644
index 0000000..42484e8
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.join
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.Stream._
+import scala.language.postfixOps
+import scala.util.Random
+
+object WindowJoin {
+
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
+
+  case class Grade(time: Long, name: String, grade: Int)
+  case class Salary(time: Long, name: String, salary: Int)
+  case class Person(name: String, grade: Int, salary: Int)
+
+  def main(args: Array[String]) {
+
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    //Create streams for grades and salaries by mapping the inputs to the 
corresponding objects
+    val grades = setGradesInput(env)
+    val salaries = setSalariesInput(env)
+
+    //Join the two input streams by name on the last 2 seconds every second 
and create new
+    //Person objects containing both grade and salary
+    val joined = grades.join(salaries)
+        .where(_.name)
+        .equalTo(_.name)
+        .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
+        .apply { (g, s) => Person(g.name, g.grade, s.salary) }
+
+    if (fileOutput) {
+      joined.writeAsText(outputPath)
+    } else {
+      joined.print()
+    }
+
+    env.execute("WindowJoin")
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
+  val gradeCount = 5
+  val salaryMax = 10000
+  val sleepInterval = 100
+  
+  def gradeStream: Stream[(Long, String, Int)] = {
+    def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
+      {
+        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
+        
(System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
+      }
+    range(1, 100).map(gradeMapper(names))
+  }
+
+  def salaryStream: Stream[(Long, String, Int)] = {
+    def salaryMapper(x: Int): (Long, String, Int) =
+      {
+        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
+        (System.currentTimeMillis(), names(Random.nextInt(names.length)), 
Random.nextInt(salaryMax))
+      }
+    range(1, 100).map(salaryMapper)
+  }
+
+  def parseMap(line : String): (Long, String, Int) = {
+    val record = line.substring(1, line.length - 1).split(",")
+    (record(0).toLong, record(1), record(2).toInt)
+  }
+
+  // *************************************************************************
+  // UTIL METHODS
+  // *************************************************************************
+
+  private var fileInput: Boolean = false
+  private var fileOutput: Boolean = false
+
+  private var gradesPath: String = null
+  private var salariesPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 1) {
+        fileOutput = true
+        outputPath = args(0)
+      }
+      else if (args.length == 3) {
+        fileInput = true
+        fileOutput = true
+        gradesPath = args(0)
+        salariesPath = args(1)
+        outputPath = args(2)
+      } else {
+        System.err.println("Usage: WindowJoin <result path> or WindowJoin 
<input path 1> " +
+          "<input path 2> <result path>")
+        return false
+      }
+    } else {
+      System.out.println("Executing WindowJoin with generated data.")
+      System.out.println("  Provide parameter to write to file.")
+      System.out.println("  Usage: WindowJoin <result path>")
+    }
+    true
+  }
+
+  private def setGradesInput(env: StreamExecutionEnvironment) : 
DataStream[Grade] = {
+    if (fileInput) {
+      env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, 
x._3))
+    } else {
+      env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
+    }
+  }
+
+  private def setSalariesInput(env: StreamExecutionEnvironment) : 
DataStream[Salary] = {
+    if (fileInput) {
+      env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, 
x._2, x._3))
+    }
+    else {
+      env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
new file mode 100644
index 0000000..9ec17d4
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.socket
+
+import org.apache.flink.streaming.api.scala._
+
+import scala.language.postfixOps
+
+/**
+ * This example shows an implementation of WordCount with data from a text 
socket. 
+ * To run the example make sure that the service providing the text data is 
already up and running.
+ *
+ * To start an example socket text stream on your local machine run netcat 
from a command line, 
+ * where the parameter specifies the port number:
+ *
+ * {{{
+ *   nc -lk 9999
+ * }}}
+ *
+ * Usage:
+ * {{{
+ *   SocketTextStreamWordCount <hostname> <port> <output path>
+ * }}}
+ *
+ * This example shows how to:
+ *
+ *   - use StreamExecutionEnvironment.socketTextStream
+ *   - write a simple Flink Streaming program in scala.
+ *   - write and use user-defined functions.
+ */
+object SocketTextStreamWordCount {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val text = env.socketTextStream(hostName, port)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .keyBy(0)
+      .sum(1)
+
+    if (fileOutput) {
+      counts.writeAsText(outputPath, 1)
+    } else {
+      counts print
+    }
+
+    env.execute("Scala SocketTextStreamWordCount Example")
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+      if (args.length == 3) {
+        fileOutput = true
+        hostName = args(0)
+        port = args(1).toInt
+        outputPath = args(2)
+      } else if (args.length == 2) {
+        hostName = args(0)
+        port = args(1).toInt
+      } else {
+        System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> 
[<output path>]")
+        return false
+      }
+    true
+  }
+
+  private var fileOutput: Boolean = false
+  private var hostName: String = null
+  private var port: Int = 0
+  private var outputPath: String = null
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
new file mode 100644
index 0000000..f26f32c
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
+
+import scala.Stream._
+import scala.math._
+import scala.language.postfixOps
+import scala.util.Random
+
+/**
+ * An example of grouped stream windowing where different eviction and 
+ * trigger policies can be used. A source fetches events from cars 
+ * every 1 sec containing their id, their current speed (kmh),
+ * overall elapsed distance (m) and a timestamp. The streaming
+ * example triggers the top speed of each car every x meters elapsed 
+ * for the last y seconds.
+ */
+object TopSpeedWindowing {
+
+  // *************************************************************************
+  // PROGRAM
+  // *************************************************************************
+
+  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)
+
+  val numOfCars = 2
+  val evictionSec = 10
+  val triggerMeters = 50d
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
+
+    val cars = setCarsInput(env)
+
+    val topSeed = cars
+      .assignAscendingTimestamps( _.time )
+      .keyBy("carId")
+      .window(GlobalWindows.create)
+      .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, 
TimeUnit.MILLISECONDS)))
+      .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
+        def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = 
newSp.distance - oldSp.distance
+      }))
+//      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
+//      .every(Delta.of[CarEvent](triggerMeters,
+//          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
+      .maxBy("speed")
+
+    if (fileOutput) {
+      topSeed.writeAsText(outputPath)
+    } else {
+      topSeed.print
+    }
+
+    env.execute("TopSpeedWindowing")
+
+  }
+
+  // *************************************************************************
+  // USER FUNCTIONS
+  // *************************************************************************
+
+  def genCarStream(): Stream[CarEvent] = {
+
+    def nextSpeed(carEvent : CarEvent) : CarEvent =
+    {
+      val next =
+        if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
+      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
+    }
+    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
+    {
+      Thread.sleep(1000)
+      speeds.append(carStream(speeds.map(nextSpeed)))
+    }
+    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
+  }
+
+  def parseMap(line : String): (Int, Int, Double, Long) = {
+    val record = line.substring(1, line.length - 1).split(",")
+    (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
+  }
+
+  // *************************************************************************
+  // UTIL METHODS
+  // *************************************************************************
+
+  var fileInput = false
+  var fileOutput = false
+  var inputPath : String = null
+  var outputPath : String = null
+
+  def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      if (args.length == 2) {
+        fileInput = true
+        fileOutput = true
+        inputPath = args(0)
+        outputPath = args(1)
+        true
+      } else {
+        System.err.println("Usage: TopSpeedWindowing <input path> <output 
path>")
+        false
+      }
+    } else {
+      true
+    }
+  }
+
+  private def setCarsInput(env: StreamExecutionEnvironment) : 
DataStream[CarEvent] = {
+    if (fileInput) {
+      env.readTextFile(inputPath).map(parseMap(_)).map(x => CarEvent(x._1, 
x._2, x._3, x._4))
+    } else {
+      env.fromCollection(genCarStream())
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
new file mode 100644
index 0000000..07d6766
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/iteration/IterateExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.iteration;
+
+import org.apache.flink.streaming.examples.iteration.IterateExample;
+import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
+
+
+       protected String inputPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               inputPath = createTempFile("fibonacciInput.txt", 
IterateExampleData.INPUT_PAIRS);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(IterateExampleData.RESULTS, 
resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               IterateExample.main(new String[]{inputPath, resultPath});
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
new file mode 100644
index 0000000..e657b67
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.join;
+
+import org.apache.flink.streaming.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+       protected String gradesPath;
+       protected String salariesPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               gradesPath = createTempFile("gradesText.txt", 
WindowJoinData.GRADES_INPUT);
+               salariesPath = createTempFile("salariesText.txt", 
WindowJoinData.SALARIES_INPUT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               // since the two sides of the join might have different speed
+               // the exact output can not be checked just whether it is 
well-formed
+               // checks that the result lines look like e.g. (bob, 2, 2015)
+               checkLinesAgainstRegexp(resultPath, 
"^\\([a-z]+,(\\d),(\\d)+\\)");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               WindowJoin.main(new String[]{gradesPath, salariesPath, 
resultPath});
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
new file mode 100644
index 0000000..83569dc
--- /dev/null
+++ 
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/ml/IncrementalLearningSkeletonITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.ml;
+
+import org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton;
+import 
org.apache.flink.streaming.examples.ml.util.IncrementalLearningSkeletonData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IncrementalLearningSkeletonITCase extends 
StreamingProgramTestBase {
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(IncrementalLearningSkeletonData.RESULTS, 
resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               IncrementalLearningSkeleton.main(new String[]{resultPath});
+       }
+}

Reply via email to