[streaming] Windowing examples cleanup

Removed obsolete examples
Updated StockPrices example
Updated build


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b402f43
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b402f43
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b402f43

Branch: refs/heads/master
Commit: 6b402f43d01495fd9a6bea1c80b2b2b50393b92d
Parents: b5752a7
Author: mbalassi <[email protected]>
Authored: Mon Feb 9 18:17:20 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Tue Feb 10 08:51:00 2015 +0100

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  72 ++-------
 .../examples/windowing/DeltaExtractExample.java | 125 ---------------
 .../windowing/MultiplePoliciesExample.java      | 139 -----------------
 .../examples/windowing/SlidingExample.java      | 139 -----------------
 .../examples/windowing/StockPrices.java         |  69 ++++++++-
 .../windowing/TimeWindowingExample.java         | 152 -------------------
 .../scala/examples/join/WindowJoin.scala        |  74 +++++++++
 .../scala/examples/windowing/StockPrices.scala  |  52 +++++++
 .../scala/examples/windowing/WindowJoin.scala   |  73 ---------
 9 files changed, 207 insertions(+), 688 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml 
b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index 3661726..6a5ae36 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -258,101 +258,57 @@ under the License.
                                                </configuration>
                                        </execution>
 
-                                       <!-- DeltaExract -->
+                                       <!-- StockPrices -->
                                        <execution>
-                                               <id>DeltaExract</id>
+                                               <id>StockPrices</id>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>jar</goal>
                                                </goals>
                                                <configuration>
-                                                       
<classifier>DeltaExract</classifier>
+                                                       
<classifier>StockPrices</classifier>
 
                                                        <archive>
                                                                
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
+                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.StockPrices</program-class>
                                                                
</manifestEntries>
                                                        </archive>
 
                                                        <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
+                                                               
<include>org/apache/flink/streaming/examples/windowing/StockPrices.class</include>
+                                                               
<include>org/apache/flink/streaming/examples/windowing/StockPrices$*.class</include>
                                                        </includes>
                                                </configuration>
                                        </execution>
 
-                                       <!-- MultiplePolicies -->
+                                       <!-- TopSpeedWindowing -->
                                        <execution>
-                                               <id>MultiplePolicies</id>
+                                               <id>TopSpeedWindowing</id>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>jar</goal>
                                                </goals>
                                                <configuration>
-                                                       
<classifier>MultiplePolicies</classifier>
+                                                       
<classifier>TopSpeedWindowing</classifier>
 
                                                        <archive>
                                                                
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
+                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class>
                                                                
</manifestEntries>
                                                        </archive>
 
                                                        <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
+                                                               
<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
+                                                               
<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
                                                        </includes>
                                                </configuration>
                                        </execution>
 
-                                       <!-- SlidingExample -->
-                                       <execution>
-                                               <id>SlidingExample</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>SlidingExample</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
-
-                                       <!-- TimeWindowing -->
-                                       <execution>
-                                               <id>TimeWindowing</id>
-                                               <phase>package</phase>
-                                               <goals>
-                                                       <goal>jar</goal>
-                                               </goals>
-                                               <configuration>
-                                                       
<classifier>TimeWindowing</classifier>
-
-                                                       <archive>
-                                                               
<manifestEntries>
-                                                                       
<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
-                                                               
</manifestEntries>
-                                                       </archive>
-
-                                                       <includes>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
-                                                               
<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
-                                                       </includes>
-                                               </configuration>
-                                       </execution>
                                </executions>
                        </plugin>
 
-<!-- Scala Compiler -->
+
+                       <!-- Scala Compiler -->
                        <plugin>
                                <groupId>net.alchim31.maven</groupId>
                                <artifactId>scala-maven-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
deleted file mode 100644
index d6a9ac0..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import 
org.apache.flink.streaming.api.windowing.deltafunction.EuclideanDistance;
-import org.apache.flink.streaming.api.windowing.extractor.FieldsFromTuple;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.util.Collector;
-
-/**
- * This example gives an impression about how to use delta policies. It also
- * shows how extractors can be used.
- */
-public class DeltaExtractExample {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               DataStream dstream = env
-                               .addSource(new CountingSource())
-                               .window(Delta.of(1.2, new EuclideanDistance(new 
FieldsFromTuple(0, 1)), new Tuple3(
-                                               0d, 0d, 
"foo"))).every(Count.of(2)).reduce(new ConcatStrings());
-
-               // emit result
-               if (fileOutput) {
-                       dstream.writeAsText(outputPath, 1);
-               } else {
-                       dstream.print();
-               }
-
-               // execute the program
-               env.execute("Delta Extract Example");
-
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       private static class CountingSource implements 
SourceFunction<Tuple3<Double, Double, String>> {
-               private static final long serialVersionUID = 1L;
-
-               private int counter = 0;
-
-               @Override
-               public void invoke(Collector<Tuple3<Double, Double, String>> 
collector) throws Exception {
-                       while (true) {
-                               if (counter > 9999) {
-                                       counter = 0;
-                               }
-                               collector.collect(new Tuple3<Double, Double, 
String>((double) counter,
-                                               (double) counter + 1, "V" + 
counter++));
-                       }
-               }
-       }
-
-       private static final class ConcatStrings implements
-                       ReduceFunction<Tuple3<Double, Double, String>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Tuple3<Double, Double, String> reduce(Tuple3<Double, 
Double, String> value1,
-                               Tuple3<Double, Double, String> value2) throws 
Exception {
-                       return new Tuple3<Double, Double, String>(value1.f0, 
value2.f1, value1.f2 + "|"
-                                       + value2.f2);
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: DeltaExtractExample 
<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing DeltaExtractExample with 
generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: DeltaExtractExample 
<result path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
deleted file mode 100644
index 48783f2..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses count based tumbling windowing with multiple eviction
- * policies at the same time.
- */
-public class MultiplePoliciesExample {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<String> stream = env.addSource(new BasicSource())
-                               .groupBy(new KeySelector<String, String>(){
-                                       private static final long 
serialVersionUID = 1L;
-                                       @Override
-                                       public String getKey(String value) 
throws Exception {
-                                               return value;
-                                       }       
-                               })
-                               .window(Count.of(2))
-                               .every(Count.of(3), Count.of(5))
-                               .reduceGroup(new Concat());
-
-               // emit result
-               if (fileOutput) {
-                       stream.writeAsText(outputPath, 1);
-               } else {
-                       stream.print();
-               }
-
-               // execute the program
-               env.execute("Multiple Policies Example");
-       }
-
-       /**
-        * This source function indefinitely provides String inputs for the
-        * topology.
-        */
-       public static final class BasicSource implements SourceFunction<String> 
{
-
-               private static final long serialVersionUID = 1L;
-
-               private final static String STR_1 = new String("streaming");
-               private final static String STR_2 = new String("flink");
-
-               @Override
-               public void invoke(Collector<String> out) throws Exception {
-                       // continuous emit
-                       while (true) {
-                               out.collect(STR_1);
-                               out.collect(STR_2);
-                       }
-               }
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * This reduce function does a String concat.
-        */
-       public static final class Concat implements GroupReduceFunction<String, 
String> {
-
-               /**
-                * Auto generates version ID
-                */
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void reduce(Iterable<String> values, Collector<String> 
out) throws Exception {
-                       String output = "|";
-                       for (String v : values) {
-                               output = output + v + "|";
-                       }
-                       out.collect(output);
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: 
MultiplePoliciesExample <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing MultiplePoliciesExample 
with generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: MultiplePoliciesExample 
<result path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
deleted file mode 100644
index cf03477..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SlidingExample.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * This example uses count based sliding windows to illustrate different
- * possibilities for the realization of sliding windows. Take a look on the 
code
- * which is commented out to see different setups.
- */
-public class SlidingExample {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               /*
-                * SIMPLE-EXAMPLE: Use this to always keep the newest 10 
elements in the
-                * buffer Resulting windows will have an overlap of 5 elements
-                */
-
-               // DataStream<String> stream = env.addSource(new 
CountingSource())
-               // .window(Count.of(10))
-               // .every(Count.of(5))
-               // .reduce(new Concat());
-
-               /*
-                * ADVANCED-EXAMPLE: Use this to have the last element of the 
last
-                * window as first element of the next window while the window 
size is
-                * always 5
-                */
-
-               DataStream<String> stream = env.addSource(new CountingSource())
-                               .window(Count.of(5)
-                               .withDelete(4))
-                               .every(Count.of(4)
-                               .startingAt(-1))
-                               .reduce(new Concat());
-
-               // emit result
-               if (fileOutput) {
-                       stream.writeAsText(outputPath, 1);
-               } else {
-                       stream.print();
-               }
-
-               // execute the program
-               env.execute("Sliding Example");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       private static final class CountingSource implements 
SourceFunction<String> {
-               private static final long serialVersionUID = 1L;
-
-               private int counter = 0;
-
-               @Override
-               public void invoke(Collector<String> collector) throws 
Exception {
-                       // continuous emit
-                       while (true) {
-                               if (counter > 9999) {
-                                       counter = 0;
-                               }
-                               collector.collect("V" + counter++);
-                       }
-               }
-       }
-
-       /**
-        * This reduce function does a String concat.
-        */
-       private static final class Concat implements ReduceFunction<String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String reduce(String value1, String value2) throws 
Exception {
-                       return value1 + "|" + value2;
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-       
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: SlidingExample 
<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing SlidingExample with 
generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: SlidingExample <result 
path>");
-               }
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index c60b5ca..ce5db4a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -38,6 +38,36 @@ import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * This example showcases a moderately complex Flink Streaming pipeline.
+ * It to computes statistics on stock market data that arrive continuously,
+ * and combines the stock market data with tweet streams.
+ * For a detailed explanation of the job, check out the blog post unrolling it.
+ * To run the example make sure that the service providing the text data
+ * is already up and running.
+ *
+ * <p>
+ * To start an example socket text stream on your local machine run netcat from
+ * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
+ * port number.
+ *
+ *
+ * <p>
+ * Usage:
+ * <code>StockPrices &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
+ * <br>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>merge and join data streams,
+ * <li>use different windowing policies,
+ * <li>define windowing aggregations.
+ * </ul>
+ *
+ * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
+ * @see <a 
href="http://flink.apache.org/news/2015/02/09/streaming-example.html";>blogpost</a>
+ */
 public class StockPrices {
 
        private static final ArrayList<String> SYMBOLS = new 
ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", 
"GOOG"));
@@ -50,13 +80,17 @@ public class StockPrices {
 
        public static void main(String[] args) throws Exception {
 
+               if (!parseParameters(args)) {
+                       return;
+               }
+
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                //Step 1 
            //Read a stream of stock prices from different sources and merge it 
into one stream
                
                //Read from a socket stream at map it to StockPrice objects
-               DataStream<StockPrice> socketStockStream = 
env.socketTextStream("localhost", 9999)
+               DataStream<StockPrice> socketStockStream = 
env.socketTextStream(hostName, port)
                                .map(new MapFunction<String, StockPrice>() {
                                        private String[] tokens;
 
@@ -155,7 +189,11 @@ public class StockPrices {
                                .reduceGroup(new CorrelationReduce())
                                .setParallelism(1);
 
-               rollingCorrelation.print();
+               if (fileOutput) {
+                       rollingCorrelation.writeAsText(outputPath, 1);
+               } else {
+                       rollingCorrelation.print();
+               }
 
                env.execute("Stock stream");
 
@@ -338,4 +376,31 @@ public class StockPrices {
                }
        }
 
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String hostName;
+       private static int port;
+       private static String outputPath;
+
+       private static boolean parseParameters(String[] args) {
+
+               // parse input arguments
+               if (args.length == 3) {
+                       fileOutput = true;
+                       hostName = args[0];
+                       port = Integer.valueOf(args[1]);
+                       outputPath = args[2];
+               } else if (args.length == 2) {
+                       hostName = args[0];
+                       port = Integer.valueOf(args[1]);
+               } else {
+                       System.err.println("Usage: StockPrices <hostname> 
<port> [<output path>]");
+                       return false;
+               }
+               return true;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
deleted file mode 100644
index 622aa82..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
-import org.apache.flink.util.Collector;
-
-/**
- * This example shows the functionality of time based windows. It utilizes the
- * {@link ActiveTriggerPolicy} implementation in the
- * {@link ActiveTimeTriggerPolicy}.
- */
-public class TimeWindowingExample {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               DataStream<Integer> stream = env.addSource(new 
CountingSourceWithSleep())
-                               .window(Count.of(100))
-                               .every(Time.of(1000, TimeUnit.MILLISECONDS))
-                               .groupBy(new MyKey())
-                               .sum(0);
-
-               // emit result
-               if (fileOutput) {
-                       stream.writeAsText(outputPath, 1);
-               } else {
-                       stream.print();
-               }
-
-               // execute the program
-               env.execute("Time Windowing Example");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * This data source emit one element every 0.001 sec. The output is an
-        * Integer counting the output elements. As soon as the counter reaches
-        * 10000 it is reset to 0. On each reset the source waits 5 sec. before 
it
-        * restarts to produce elements.
-        */
-       private static final class CountingSourceWithSleep extends 
RichSourceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-
-               private int counter = 0;
-               private transient Random rnd;
-               
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       rnd = new Random();
-               }
-
-               @Override
-               public void invoke(Collector<Integer> collector) throws 
Exception {
-                       // continuous emit
-                       while (true) {
-                               if (counter > 9999) {
-                                       System.out.println("Source pauses 
now!");
-                                       Thread.sleep(5000);
-                                       System.out.println("Source continouse 
with emitting now!");
-                                       counter = 0;
-                               }
-                               collector.collect(rnd.nextInt(9) + 1);
-
-                               // Wait 0.001 sec. before the next emit. 
Otherwise the source is
-                               // too fast for local tests and you might 
always see
-                               // SUM[k=1..9999](k) as result.
-                               Thread.sleep(1);
-                               counter++;
-                       }
-               }
-       }
-
-       private static final class MyKey implements KeySelector<Integer, 
Integer> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Integer getKey(Integer value) throws Exception {
-                       if (value < 2) {
-                               return 0;
-                       } else {
-                               return 1;
-                       }
-               }
-
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 1) {
-                               outputPath = args[0];
-                       } else {
-                               System.err.println("Usage: TimeWindowingExample 
<result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing TimeWindowingExample with 
generated data.");
-                       System.out.println("  Provide parameter to write to 
file.");
-                       System.out.println("  Usage: TimeWindowingExample 
<result path>");
-               }
-               return true;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
new file mode 100644
index 0000000..a36a03b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.join
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.scala._
+
+import scala.Stream._
+import scala.language.postfixOps
+import scala.util.Random
+
+object WindowJoin {
+
+  case class Name(id: Long, name: String)
+  case class Age(id: Long, age: Int)
+  case class Person(name: String, age: Long)
+
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
+    val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
+    val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
+
+    //Join the two input streams by id on the last 2 seconds every second and 
create new 
+    //Person objects containing both name and age
+    val joined =
+      names.join(ages).onWindow(2, TimeUnit.SECONDS)
+                      .every(1, TimeUnit.SECONDS)
+                      .where("id")
+                      .equalTo("id") { (n, a) => Person(n.name, a.age) }
+
+    joined print
+
+    env.execute("WindowJoin")
+  }
+
+  def nameStream() : Stream[(Long,String)] = {
+    def nameMapper(names: Array[String])(x: Int) : (Long, String) =
+    {
+      if(x%100==0) Thread.sleep(1000)
+      (x, names(Random.nextInt(names.length)))
+    }
+    range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", 
"john", "grace")))
+  }
+
+  def ageStream() : Stream[(Long,Int)] = {
+    def ageMapper(x: Int) : (Long, Int) =
+    {
+      if(x%100==0) Thread.sleep(1000)
+      (x, Random.nextInt(90))
+    }
+    range(1,10000).map(ageMapper)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
index f357fe7..8a0ce5e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -27,6 +27,33 @@ import org.apache.flink.util.Collector
 
 import scala.util.Random
 
+/**
+ * This example showcases a moderately complex Flink Streaming pipeline.
+ * It to computes statistics on stock market data that arrive continuously,
+ * and combines the stock market data with tweet streams.
+ * For a detailed explanation of the job, check out the
+ * [[http://flink.apache.org/news/2015/02/09/streaming-example.html blog post]]
+ * unrolling it. To run the example make sure that the service providing
+ * the text data is already up and running.
+ *
+ * To start an example socket text stream on your local machine run netcat
+ * from a command line, where the parameter specifies the port number:
+ *
+ * {{{
+ *   nc -lk 9999
+ * }}}
+ *
+ * Usage:
+ * {{{
+ *   StockPrices <hostname> <port> <output path>
+ * }}}
+ *
+ * This example shows how to:
+ *
+ *   - merge and join data streams,
+ *   - use different windowing policies,
+ *   - define windowing aggregations.
+ */
 object StockPrices {
 
   case class StockPrice(symbol: String, price: Double)
@@ -36,8 +63,17 @@ object StockPrices {
 
   val defaultPrice = StockPrice("", 1000)
 
+  private var fileOutput: Boolean = false
+  private var hostName: String = null
+  private var port: Int = 0
+  private var outputPath: String = null
+
   def main(args: Array[String]) {
 
+    if (!parseParameters(args)) {
+      return
+    }
+
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     //Step 1 
@@ -163,4 +199,20 @@ object StockPrices {
     }
   }
 
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 3) {
+      fileOutput = true
+      hostName = args(0)
+      port = args(1).toInt
+      outputPath = args(2)
+    } else if (args.length == 2) {
+      hostName = args(0)
+      port = args(1).toInt
+    } else {
+      System.err.println("Usage: StockPrices <hostname> <port> [<output 
path>]")
+      return false
+    }
+    true
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b402f43/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
deleted file mode 100644
index 119862e..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowJoin.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.streaming.api.scala._
-
-import scala.Stream._
-import scala.util.Random
-import java.util.concurrent.TimeUnit
-import scala.language.postfixOps
-
-object WindowJoin {
-
-  case class Name(id: Long, name: String)
-  case class Age(id: Long, age: Int)
-  case class Person(name: String, age: Long)
-
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Create streams for names and ages by mapping the inputs to the 
corresponding objects
-    val names = env.fromCollection(nameStream).map(x => Name(x._1, x._2))
-    val ages = env.fromCollection(ageStream).map(x => Age(x._1, x._2))
-
-    //Join the two input streams by id on the last 2 seconds every second and 
create new 
-    //Person objects containing both name and age
-    val joined =
-      names.join(ages).onWindow(2, TimeUnit.SECONDS)
-                      .every(1, TimeUnit.SECONDS)
-                      .where("id")
-                      .equalTo("id") { (n, a) => Person(n.name, a.age) }
-
-    joined print
-
-    env.execute("WindowJoin")
-  }
-
-  def nameStream() : Stream[(Long,String)] = {
-    def nameMapper(names: Array[String])(x: Int) : (Long, String) =
-    {
-      if(x%100==0) Thread.sleep(1000)
-      (x, names(Random.nextInt(names.length)))
-    }
-    range(1,10000).map(nameMapper(Array("tom", "jerry", "alice", "bob", 
"john", "grace")))
-  }
-
-  def ageStream() : Stream[(Long,Int)] = {
-    def ageMapper(x: Int) : (Long, Int) =
-    {
-      if(x%100==0) Thread.sleep(1000)
-      (x, Random.nextInt(90))
-    }
-    range(1,10000).map(ageMapper)
-  }
-
-}

Reply via email to