Repository: flink
Updated Branches:
  refs/heads/master bfb611f3e -> 6b402f43d


[streaming] Java Stock streaming example added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5752a76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5752a76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5752a76

Branch: refs/heads/master
Commit: b5752a76c4b835931131b48c89734d44fdee5dee
Parents: 2afc9ec
Author: mbalassi <[email protected]>
Authored: Fri Feb 6 11:31:20 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Feb 9 21:55:09 2015 +0100

----------------------------------------------------------------------
 .../scala/examples/windowing/StockPrices.scala  | 158 ---------
 .../examples/windowing/StockPrices.java         | 341 +++++++++++++++++++
 .../scala/examples/windowing/StockPrices.scala  | 166 +++++++++
 3 files changed, 507 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
 
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
deleted file mode 100644
index 0fee767..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.helper.Count
-import org.apache.flink.util.Collector
-import org.apache.flink.streaming.api.windowing.helper.Time
-import java.util.concurrent.TimeUnit._
-import org.apache.flink.streaming.api.scala.windowing.Delta
-import scala.util.Random
-
-object StockPrices {
-
-  case class StockPrice(symbol: String, price: Double)
-  case class Count(symbol: String, count: Int)
-
-  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
-
-  val defaultPrice = StockPrice("", 1000)
-
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Step 1 
-    //Read a stream of stock prices from different sources and merge it into 
one stream
-
-    //Read from a socket stream at map it to StockPrice objects
-    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
-      val split = x.split(",")
-      StockPrice(split(0), split(1).toDouble)
-    })
-
-    //Generate other stock streams
-    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
-    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
-    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
-    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
-
-    //Merge all stock streams together
-    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
DJI_Stream, BUX_Stream)
-
-    //Step 2
-    //Compute some simple statistics on a rolling window
-    val windowedStream = stockStream.window(Time.of(10, 
SECONDS)).every(Time.of(5, SECONDS))
-
-    val lowest = windowedStream.minBy("price").setParallelism(1)
-    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
-    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
-
-    //Step 3 
-    //Use  delta policy to create price change warnings, and also count the 
number of warning every half minute
-
-    val priceWarnings = stockStream.groupBy("symbol")
-      .window(Delta.of(0.05, priceChange, defaultPrice))
-      .reduceGroup(sendWarning _)
-
-    val warningsPerStock = priceWarnings.map(Count(_, 1))
-      .groupBy("symbol")
-      .window(Time.of(30, SECONDS))
-      .sum("count")
-
-    //Step 4 
-    //Read a stream of tweets and extract the stock symbols
-
-    val tweetStream = env.addSource(generateTweets _)
-
-    val mentionedSymbols = tweetStream.flatMap(
-      tweet => for (word <- tweet.split(" ").map(_.toUpperCase()) if 
symbols.contains(word)) yield word)
-
-    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
-      .groupBy("symbol")
-      .window(Time.of(30, SECONDS))
-      .sum("count")
-
-    //Step 5
-    //For advanced analysis we join the number of tweets and the number of 
price change warnings by stock
-    //for the last half minute, we keep only the counts. We use this 
information to compute rolling correlations
-    //between the tweets and the price changes                              
-
-    val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
-      .onWindow(30, SECONDS)
-      .where("symbol")
-      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
-
-    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, 
SECONDS)).reduceGroup(computeCorrelation _).setParallelism(1)
-
-    rollingCorrelation.print
-
-    env.execute("Stock stream")
-  }
-
-  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
-    Math.abs(p1.price / p2.price - 1)
-  }
-
-  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
-    if (ts.nonEmpty) out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: 
Double)(_ + _.price) / ts.size))
-  }
-
-  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
-    if (ts.nonEmpty) out.collect(ts.head.symbol)
-  }
-
-  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) 
= {
-    if (input.nonEmpty) {
-      val var1 = input.map(_._1)
-      val mean1 = average(var1)
-      val var2 = input.map(_._2)
-      val mean2 = average(var2)
-
-      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - 
mean2)))
-      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
-      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
-
-      out.collect(cov / (d1 * d2))
-    }
-  }
-
-  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
-    var price = 1000.
-    while (true) {
-      price = price + Random.nextGaussian * sigma
-      out.collect(StockPrice(symbol, price))
-      Thread.sleep(Random.nextInt(200))
-    }
-  }
-
-  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
-    num.toDouble(ts.sum) / ts.size
-  }
-
-  def generateTweets(out: Collector[String]) = {
-    while (true) {
-      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
-      out.collect(s.mkString(" "))
-      Thread.sleep(Random.nextInt(500))
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
new file mode 100644
index 0000000..c60b5ca
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+public class StockPrices {
+
+       private static final ArrayList<String> SYMBOLS = new 
ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", 
"GOOG"));
+       private static final Double DEFAULT_PRICE = 1000.;
+       private static final StockPrice DEFAULT_STOCK_PRICE = new 
StockPrice("", DEFAULT_PRICE);
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(String[] args) throws Exception {
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               //Step 1 
+           //Read a stream of stock prices from different sources and merge it 
into one stream
+               
+               //Read from a socket stream at map it to StockPrice objects
+               DataStream<StockPrice> socketStockStream = 
env.socketTextStream("localhost", 9999)
+                               .map(new MapFunction<String, StockPrice>() {
+                                       private String[] tokens;
+
+                                       @Override
+                                       public StockPrice map(String value) 
throws Exception {
+                                               tokens = value.split(",");
+                                               return new 
StockPrice(tokens[0], Double.parseDouble(tokens[1]));
+                                       }
+                               });
+
+               //Generate other stock streams
+               DataStream<StockPrice> SPX_stream = env.addSource(new 
StockSource("SPX", 10));
+               DataStream<StockPrice> FTSE_stream = env.addSource(new 
StockSource("FTSE", 20));
+               DataStream<StockPrice> DJI_stream = env.addSource(new 
StockSource("DJI", 30));
+               DataStream<StockPrice> BUX_stream = env.addSource(new 
StockSource("BUX", 40));
+
+               //Merge all stock streams together
+               DataStream<StockPrice> stockStream = 
socketStockStream.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
+               
+               //Step 2
+           //Compute some simple statistics on a rolling window
+               WindowedDataStream<StockPrice> windowedStream = stockStream
+                               .window(Time.of(10, TimeUnit.SECONDS))
+                               .every(Time.of(5, TimeUnit.SECONDS));
+
+               DataStream<StockPrice> lowest = 
windowedStream.minBy("price").setParallelism(1);
+               DataStream<StockPrice> maxByStock = 
windowedStream.groupBy("symbol").maxBy("price");
+               DataStream<StockPrice> rollingMean = 
windowedStream.groupBy("symbol").reduceGroup(new MeanReduce());
+
+               //Step 3
+               //Use  delta policy to create price change warnings, and also 
count the number of warning every half minute
+
+               DataStream<String> priceWarnings = stockStream.groupBy("symbol")
+                               .window(Delta.of(0.05, new 
DeltaFunction<StockPrice>() {
+                                       @Override
+                                       public double getDelta(StockPrice 
oldDataPoint, StockPrice newDataPoint) {
+                                               return 
Math.abs(oldDataPoint.price - newDataPoint.price);
+                                       }
+                               }, DEFAULT_STOCK_PRICE))
+                               .reduceGroup(new SendWarning());
+
+
+               DataStream<Count> warningsPerStock = priceWarnings.map(new 
MapFunction<String, Count>() {
+                       @Override
+                       public Count map(String value) throws Exception {
+                               return new Count(value, 1);
+                       }
+               }).groupBy("symbol").window(Time.of(30, 
TimeUnit.SECONDS)).sum("count");
+
+               //Step 4
+               //Read a stream of tweets and extract the stock symbols
+               DataStream<String> tweetStream = env.addSource(new 
TweetSource());
+
+               DataStream<String> mentionedSymbols = tweetStream.flatMap(new 
FlatMapFunction<String, String>() {
+                       @Override
+                       public void flatMap(String value, Collector<String> 
out) throws Exception {
+                               String[] words = value.split(" ");
+                               for (String word : words) {
+                                       out.collect(word.toUpperCase());
+                               }
+                       }
+               }).filter(new FilterFunction<String>() {
+                       @Override
+                       public boolean filter(String value) throws Exception {
+                               return SYMBOLS.contains(value);
+                       }
+               });
+
+               DataStream<Count> tweetsPerStock = mentionedSymbols.map(new 
MapFunction<String, Count>() {
+                       @Override
+                       public Count map(String value) throws Exception {
+                               return new Count(value, 1);
+                       }
+               }).groupBy("symbol")
+                               .window(Time.of(30, TimeUnit.SECONDS))
+                               .sum("count");
+
+               //Step 5
+               //For advanced analysis we join the number of tweets and the 
number of price change warnings by stock
+               //for the last half minute, we keep only the counts. We use 
this information to compute rolling correlations
+               //between the tweets and the price changes
+
+               DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = 
warningsPerStock.join(tweetsPerStock)
+                               .onWindow(30, TimeUnit.SECONDS)
+                               .where("symbol")
+                               .equalTo("symbol")
+                               .with(new JoinFunction<Count, Count, 
Tuple2<Integer, Integer>>() {
+                                       @Override
+                                       public Tuple2<Integer, Integer> 
join(Count first, Count second) throws Exception {
+                                               return new Tuple2<Integer, 
Integer>(first.count, second.count);
+                                       }
+                               });
+
+               DataStream<Double> rollingCorrelation = tweetsAndWarning
+                               .window(Time.of(30, TimeUnit.SECONDS))
+                               .reduceGroup(new CorrelationReduce())
+                               .setParallelism(1);
+
+               rollingCorrelation.print();
+
+               env.execute("Stock stream");
+
+       }
+
+       // 
*************************************************************************
+       // DATA TYPES
+       // 
*************************************************************************
+
+       public static class StockPrice implements Serializable {
+
+               public String symbol;
+               public Double price;
+
+               public StockPrice() {
+               }
+
+               public StockPrice(String symbol, Double price) {
+                       this.symbol = symbol;
+                       this.price = price;
+               }
+
+               @Override
+               public String toString() {
+                       return "StockPrice{" +
+                                       "symbol='" + symbol + '\'' +
+                                       ", count=" + price +
+                                       '}';
+               }
+       }
+
+       public static class Count implements Serializable {
+               public String symbol;
+               public Integer count;
+
+               public Count() {
+               }
+
+               public Count(String symbol, Integer count) {
+                       this.symbol = symbol;
+                       this.count = count;
+               }
+
+               @Override
+               public String toString() {
+                       return "Count{" +
+                                       "symbol='" + symbol + '\'' +
+                                       ", count=" + count +
+                                       '}';
+               }
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       public final static class StockSource implements 
SourceFunction<StockPrice> {
+
+               private Double price;
+               private String symbol;
+               private Integer sigma;
+
+               public StockSource(String symbol, Integer sigma) {
+                       this.symbol = symbol;
+                       this.sigma = sigma;
+               }
+
+               @Override
+               public void invoke(Collector<StockPrice> collector) throws 
Exception {
+                       price = DEFAULT_PRICE;
+                       Random random = new Random();
+
+                       while (true) {
+                               price = price + random.nextGaussian() * sigma;
+                               collector.collect(new StockPrice(symbol, 
price));
+                               Thread.sleep(random.nextInt(200));
+                       }
+               }
+       }
+
+       public final static class MeanReduce implements 
GroupReduceFunction<StockPrice, StockPrice> {
+
+               private Double sum = 0.0;
+               private Integer count = 0;
+               private String symbol = "";
+
+               @Override
+               public void reduce(Iterable<StockPrice> values, 
Collector<StockPrice> out) throws Exception {
+                       if (values.iterator().hasNext()) {
+
+                               for (StockPrice sp : values) {
+                                       sum += sp.price;
+                                       symbol = sp.symbol;
+                                       count++;
+                               }
+                               out.collect(new StockPrice(symbol, sum / 
count));
+                       }
+               }
+       }
+
+       public static final class TweetSource implements SourceFunction<String> 
{
+
+               Random random;
+               StringBuilder stringBuilder;
+
+               @Override
+               public void invoke(Collector<String> collector) throws 
Exception {
+                       random = new Random();
+                       stringBuilder = new StringBuilder();
+
+                       while (true) {
+                               stringBuilder.setLength(0);
+                               for (int i = 0; i < 3; i++) {
+                                       stringBuilder.append(" ");
+                                       
stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
+                               }
+                               collector.collect(stringBuilder.toString());
+                               Thread.sleep(500);
+                       }
+
+               }
+       }
+
+       public static final class SendWarning implements 
GroupReduceFunction<StockPrice, String> {
+               @Override
+               public void reduce(Iterable<StockPrice> values, 
Collector<String> out) throws Exception {
+                       if (values.iterator().hasNext()) {
+                               out.collect(values.iterator().next().symbol);
+                       }
+               }
+       }
+
+       public static final class CorrelationReduce implements 
GroupReduceFunction<Tuple2<Integer, Integer>, Double> {
+
+               private Integer leftSum;
+               private Integer rightSum;
+               private Integer count;
+
+               private Double leftMean;
+               private Double rightMean;
+
+               private Double cov;
+               private Double leftSd;
+               private Double rightSd;
+
+               @Override
+               public void reduce(Iterable<Tuple2<Integer, Integer>> values, 
Collector<Double> out) throws Exception {
+
+                       leftSum = 0;
+                       rightSum = 0;
+                       count = 0;
+
+                       cov = 0.;
+                       leftSd = 0.;
+                       rightSd = 0.;
+
+                       //compute mean for both sides, save count
+                       for (Tuple2<Integer, Integer> pair : values) {
+                               leftSum += pair.f0;
+                               rightSum += pair.f1;
+                               count++;
+                       }
+
+                       leftMean = leftSum.doubleValue() / count;
+                       rightMean = rightSum.doubleValue() / count;
+
+                       //compute covariance & std. deviations
+                       for (Tuple2<Integer, Integer> pair : values) {
+                               cov += (pair.f0 - leftMean) * (pair.f1 - 
rightMean) / count;
+                       }
+
+                       for (Tuple2<Integer, Integer> pair : values) {
+                               leftSd += Math.pow(pair.f0 - leftMean, 2) / 
count;
+                               rightSd += Math.pow(pair.f1 - rightMean, 2) / 
count;
+                       }
+                       leftSd = Math.sqrt(leftSd);
+                       rightSd = Math.sqrt(rightSd);
+
+                       out.collect(cov / (leftSd * rightSd));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
new file mode 100644
index 0000000..f357fe7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit._
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.scala.windowing.Delta
+import org.apache.flink.streaming.api.windowing.helper.Time
+import org.apache.flink.util.Collector
+
+import scala.util.Random
+
+object StockPrices {
+
+  case class StockPrice(symbol: String, price: Double)
+  case class Count(symbol: String, count: Int)
+
+  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
+
+  val defaultPrice = StockPrice("", 1000)
+
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Step 1 
+    //Read a stream of stock prices from different sources and merge it into 
one stream
+
+    //Read from a socket stream at map it to StockPrice objects
+    val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
+      val split = x.split(",")
+      StockPrice(split(0), split(1).toDouble)
+    })
+
+    //Generate other stock streams
+    val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
+    val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
+    val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
+    val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
+
+    //Merge all stock streams together
+    val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, 
DJI_Stream, BUX_Stream)
+
+    //Step 2
+    //Compute some simple statistics on a rolling window
+    val windowedStream = stockStream.window(Time.of(10, 
SECONDS)).every(Time.of(5, SECONDS))
+
+    val lowest = windowedStream.minBy("price").setParallelism(1)
+    val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
+    val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _)
+
+    //Step 3 
+    //Use  delta policy to create price change warnings,
+    // and also count the number of warning every half minute
+
+    val priceWarnings = stockStream.groupBy("symbol")
+      .window(Delta.of(0.05, priceChange, defaultPrice))
+      .reduceGroup(sendWarning _)
+
+    val warningsPerStock = priceWarnings.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 4 
+    //Read a stream of tweets and extract the stock symbols
+
+    val tweetStream = env.addSource(generateTweets _)
+
+    val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
+      .map(_.toUpperCase())
+      .filter(symbols.contains(_))                     
+                    
+    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
+      .groupBy("symbol")
+      .window(Time.of(30, SECONDS))
+      .sum("count")
+
+    //Step 5
+    //For advanced analysis we join the number of tweets and
+    //the number of price change warnings by stock
+    //for the last half minute, we keep only the counts.
+    //This information is used to compute rolling correlations
+    //between the tweets and the price changes                              
+
+    val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
+      .onWindow(30, SECONDS)
+      .where("symbol")
+      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
+
+    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
+      .reduceGroup(computeCorrelation _).setParallelism(1)
+
+    rollingCorrelation.print
+
+    env.execute("Stock stream")
+  }
+
+  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
+    Math.abs(p1.price / p2.price - 1)
+  }
+
+  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
+    if (ts.nonEmpty) {
+      out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + 
_.price) / ts.size))
+    }
+  }
+
+  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
+    if (ts.nonEmpty) out.collect(ts.head.symbol)
+  }
+
+  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) 
= {
+    if (input.nonEmpty) {
+      val var1 = input.map(_._1)
+      val mean1 = average(var1)
+      val var2 = input.map(_._2)
+      val mean2 = average(var2)
+
+      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - 
mean2)))
+      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
+      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
+
+      out.collect(cov / (d1 * d2))
+    }
+  }
+
+  def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
+    var price = 1000.
+    while (true) {
+      price = price + Random.nextGaussian * sigma
+      out.collect(StockPrice(symbol, price))
+      Thread.sleep(Random.nextInt(200))
+    }
+  }
+
+  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
+    num.toDouble(ts.sum) / ts.size
+  }
+
+  def generateTweets(out: Collector[String]) = {
+    while (true) {
+      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
+      out.collect(s.mkString(" "))
+      Thread.sleep(Random.nextInt(500))
+    }
+  }
+
+}

Reply via email to