Repository: flink Updated Branches: refs/heads/master bfb611f3e -> 6b402f43d
[streaming] Java Stock streaming example added Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5752a76 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5752a76 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5752a76 Branch: refs/heads/master Commit: b5752a76c4b835931131b48c89734d44fdee5dee Parents: 2afc9ec Author: mbalassi <[email protected]> Authored: Fri Feb 6 11:31:20 2015 +0100 Committer: mbalassi <[email protected]> Committed: Mon Feb 9 21:55:09 2015 +0100 ---------------------------------------------------------------------- .../scala/examples/windowing/StockPrices.scala | 158 --------- .../examples/windowing/StockPrices.java | 341 +++++++++++++++++++ .../scala/examples/windowing/StockPrices.scala | 166 +++++++++ 3 files changed, 507 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala deleted file mode 100644 index 0fee767..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.scala.examples.windowing - -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.windowing.helper.Count -import org.apache.flink.util.Collector -import org.apache.flink.streaming.api.windowing.helper.Time -import java.util.concurrent.TimeUnit._ -import org.apache.flink.streaming.api.scala.windowing.Delta -import scala.util.Random - -object StockPrices { - - case class StockPrice(symbol: String, price: Double) - case class Count(symbol: String, count: Int) - - val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG") - - val defaultPrice = StockPrice("", 1000) - - def main(args: Array[String]) { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - - //Step 1 - //Read a stream of stock prices from different sources and merge it into one stream - - //Read from a socket stream at map it to StockPrice objects - val socketStockStream = env.socketTextStream("localhost", 9999).map(x => { - val split = x.split(",") - StockPrice(split(0), split(1).toDouble) - }) - - //Generate other stock streams - val SPX_Stream = env.addSource(generateStock("SPX")(10) _) - val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) - val DJI_Stream = env.addSource(generateStock("DJI")(30) _) - val BUX_Stream = env.addSource(generateStock("BUX")(40) _) - - //Merge all stock streams together - val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream) - - //Step 2 - //Compute some simple statistics on a rolling window - val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS)) - - val lowest = windowedStream.minBy("price").setParallelism(1) - val maxByStock = windowedStream.groupBy("symbol").maxBy("price") - val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _) - - //Step 3 - //Use delta policy to create price change warnings, and also count the number of warning every half minute - - val priceWarnings = stockStream.groupBy("symbol") - .window(Delta.of(0.05, priceChange, defaultPrice)) - .reduceGroup(sendWarning _) - - val warningsPerStock = priceWarnings.map(Count(_, 1)) - .groupBy("symbol") - .window(Time.of(30, SECONDS)) - .sum("count") - - //Step 4 - //Read a stream of tweets and extract the stock symbols - - val tweetStream = env.addSource(generateTweets _) - - val mentionedSymbols = tweetStream.flatMap( - tweet => for (word <- tweet.split(" ").map(_.toUpperCase()) if symbols.contains(word)) yield word) - - val tweetsPerStock = mentionedSymbols.map(Count(_, 1)) - .groupBy("symbol") - .window(Time.of(30, SECONDS)) - .sum("count") - - //Step 5 - //For advanced analysis we join the number of tweets and the number of price change warnings by stock - //for the last half minute, we keep only the counts. We use this information to compute rolling correlations - //between the tweets and the price changes - - val tweetsAndWarning = warningsPerStock.join(tweetsPerStock) - .onWindow(30, SECONDS) - .where("symbol") - .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) } - - val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS)).reduceGroup(computeCorrelation _).setParallelism(1) - - rollingCorrelation.print - - env.execute("Stock stream") - } - - def priceChange(p1: StockPrice, p2: StockPrice): Double = { - Math.abs(p1.price / p2.price - 1) - } - - def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = { - if (ts.nonEmpty) out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size)) - } - - def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = { - if (ts.nonEmpty) out.collect(ts.head.symbol) - } - - def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = { - if (input.nonEmpty) { - val var1 = input.map(_._1) - val mean1 = average(var1) - val var2 = input.map(_._2) - val mean2 = average(var2) - - val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2))) - val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2)))) - val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2)))) - - out.collect(cov / (d1 * d2)) - } - } - - def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = { - var price = 1000. - while (true) { - price = price + Random.nextGaussian * sigma - out.collect(StockPrice(symbol, price)) - Thread.sleep(Random.nextInt(200)) - } - } - - def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = { - num.toDouble(ts.sum) / ts.size - } - - def generateTweets(out: Collector[String]) = { - while (true) { - val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size))) - out.collect(s.mkString(" ")) - Thread.sleep(Random.nextInt(500)) - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java new file mode 100644 index 0000000..c60b5ca --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.windowing; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.WindowedDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.windowing.helper.Delta; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +public class StockPrices { + + private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")); + private static final Double DEFAULT_PRICE = 1000.; + private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE); + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + //Step 1 + //Read a stream of stock prices from different sources and merge it into one stream + + //Read from a socket stream at map it to StockPrice objects + DataStream<StockPrice> socketStockStream = env.socketTextStream("localhost", 9999) + .map(new MapFunction<String, StockPrice>() { + private String[] tokens; + + @Override + public StockPrice map(String value) throws Exception { + tokens = value.split(","); + return new StockPrice(tokens[0], Double.parseDouble(tokens[1])); + } + }); + + //Generate other stock streams + DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10)); + DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20)); + DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30)); + DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40)); + + //Merge all stock streams together + DataStream<StockPrice> stockStream = socketStockStream.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream); + + //Step 2 + //Compute some simple statistics on a rolling window + WindowedDataStream<StockPrice> windowedStream = stockStream + .window(Time.of(10, TimeUnit.SECONDS)) + .every(Time.of(5, TimeUnit.SECONDS)); + + DataStream<StockPrice> lowest = windowedStream.minBy("price").setParallelism(1); + DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price"); + DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").reduceGroup(new MeanReduce()); + + //Step 3 + //Use delta policy to create price change warnings, and also count the number of warning every half minute + + DataStream<String> priceWarnings = stockStream.groupBy("symbol") + .window(Delta.of(0.05, new DeltaFunction<StockPrice>() { + @Override + public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) { + return Math.abs(oldDataPoint.price - newDataPoint.price); + } + }, DEFAULT_STOCK_PRICE)) + .reduceGroup(new SendWarning()); + + + DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() { + @Override + public Count map(String value) throws Exception { + return new Count(value, 1); + } + }).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count"); + + //Step 4 + //Read a stream of tweets and extract the stock symbols + DataStream<String> tweetStream = env.addSource(new TweetSource()); + + DataStream<String> mentionedSymbols = tweetStream.flatMap(new FlatMapFunction<String, String>() { + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + String[] words = value.split(" "); + for (String word : words) { + out.collect(word.toUpperCase()); + } + } + }).filter(new FilterFunction<String>() { + @Override + public boolean filter(String value) throws Exception { + return SYMBOLS.contains(value); + } + }); + + DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String, Count>() { + @Override + public Count map(String value) throws Exception { + return new Count(value, 1); + } + }).groupBy("symbol") + .window(Time.of(30, TimeUnit.SECONDS)) + .sum("count"); + + //Step 5 + //For advanced analysis we join the number of tweets and the number of price change warnings by stock + //for the last half minute, we keep only the counts. We use this information to compute rolling correlations + //between the tweets and the price changes + + DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock.join(tweetsPerStock) + .onWindow(30, TimeUnit.SECONDS) + .where("symbol") + .equalTo("symbol") + .with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() { + @Override + public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception { + return new Tuple2<Integer, Integer>(first.count, second.count); + } + }); + + DataStream<Double> rollingCorrelation = tweetsAndWarning + .window(Time.of(30, TimeUnit.SECONDS)) + .reduceGroup(new CorrelationReduce()) + .setParallelism(1); + + rollingCorrelation.print(); + + env.execute("Stock stream"); + + } + + // ************************************************************************* + // DATA TYPES + // ************************************************************************* + + public static class StockPrice implements Serializable { + + public String symbol; + public Double price; + + public StockPrice() { + } + + public StockPrice(String symbol, Double price) { + this.symbol = symbol; + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "symbol='" + symbol + '\'' + + ", count=" + price + + '}'; + } + } + + public static class Count implements Serializable { + public String symbol; + public Integer count; + + public Count() { + } + + public Count(String symbol, Integer count) { + this.symbol = symbol; + this.count = count; + } + + @Override + public String toString() { + return "Count{" + + "symbol='" + symbol + '\'' + + ", count=" + count + + '}'; + } + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + public final static class StockSource implements SourceFunction<StockPrice> { + + private Double price; + private String symbol; + private Integer sigma; + + public StockSource(String symbol, Integer sigma) { + this.symbol = symbol; + this.sigma = sigma; + } + + @Override + public void invoke(Collector<StockPrice> collector) throws Exception { + price = DEFAULT_PRICE; + Random random = new Random(); + + while (true) { + price = price + random.nextGaussian() * sigma; + collector.collect(new StockPrice(symbol, price)); + Thread.sleep(random.nextInt(200)); + } + } + } + + public final static class MeanReduce implements GroupReduceFunction<StockPrice, StockPrice> { + + private Double sum = 0.0; + private Integer count = 0; + private String symbol = ""; + + @Override + public void reduce(Iterable<StockPrice> values, Collector<StockPrice> out) throws Exception { + if (values.iterator().hasNext()) { + + for (StockPrice sp : values) { + sum += sp.price; + symbol = sp.symbol; + count++; + } + out.collect(new StockPrice(symbol, sum / count)); + } + } + } + + public static final class TweetSource implements SourceFunction<String> { + + Random random; + StringBuilder stringBuilder; + + @Override + public void invoke(Collector<String> collector) throws Exception { + random = new Random(); + stringBuilder = new StringBuilder(); + + while (true) { + stringBuilder.setLength(0); + for (int i = 0; i < 3; i++) { + stringBuilder.append(" "); + stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size()))); + } + collector.collect(stringBuilder.toString()); + Thread.sleep(500); + } + + } + } + + public static final class SendWarning implements GroupReduceFunction<StockPrice, String> { + @Override + public void reduce(Iterable<StockPrice> values, Collector<String> out) throws Exception { + if (values.iterator().hasNext()) { + out.collect(values.iterator().next().symbol); + } + } + } + + public static final class CorrelationReduce implements GroupReduceFunction<Tuple2<Integer, Integer>, Double> { + + private Integer leftSum; + private Integer rightSum; + private Integer count; + + private Double leftMean; + private Double rightMean; + + private Double cov; + private Double leftSd; + private Double rightSd; + + @Override + public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out) throws Exception { + + leftSum = 0; + rightSum = 0; + count = 0; + + cov = 0.; + leftSd = 0.; + rightSd = 0.; + + //compute mean for both sides, save count + for (Tuple2<Integer, Integer> pair : values) { + leftSum += pair.f0; + rightSum += pair.f1; + count++; + } + + leftMean = leftSum.doubleValue() / count; + rightMean = rightSum.doubleValue() / count; + + //compute covariance & std. deviations + for (Tuple2<Integer, Integer> pair : values) { + cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count; + } + + for (Tuple2<Integer, Integer> pair : values) { + leftSd += Math.pow(pair.f0 - leftMean, 2) / count; + rightSd += Math.pow(pair.f1 - rightMean, 2) / count; + } + leftSd = Math.sqrt(leftSd); + rightSd = Math.sqrt(rightSd); + + out.collect(cov / (leftSd * rightSd)); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b5752a76/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala new file mode 100644 index 0000000..f357fe7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.scala.examples.windowing + +import java.util.concurrent.TimeUnit._ + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.windowing.Delta +import org.apache.flink.streaming.api.windowing.helper.Time +import org.apache.flink.util.Collector + +import scala.util.Random + +object StockPrices { + + case class StockPrice(symbol: String, price: Double) + case class Count(symbol: String, count: Int) + + val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG") + + val defaultPrice = StockPrice("", 1000) + + def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + //Step 1 + //Read a stream of stock prices from different sources and merge it into one stream + + //Read from a socket stream at map it to StockPrice objects + val socketStockStream = env.socketTextStream("localhost", 9999).map(x => { + val split = x.split(",") + StockPrice(split(0), split(1).toDouble) + }) + + //Generate other stock streams + val SPX_Stream = env.addSource(generateStock("SPX")(10) _) + val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _) + val DJI_Stream = env.addSource(generateStock("DJI")(30) _) + val BUX_Stream = env.addSource(generateStock("BUX")(40) _) + + //Merge all stock streams together + val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream) + + //Step 2 + //Compute some simple statistics on a rolling window + val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS)) + + val lowest = windowedStream.minBy("price").setParallelism(1) + val maxByStock = windowedStream.groupBy("symbol").maxBy("price") + val rollingMean = windowedStream.groupBy("symbol").reduceGroup(mean _) + + //Step 3 + //Use delta policy to create price change warnings, + // and also count the number of warning every half minute + + val priceWarnings = stockStream.groupBy("symbol") + .window(Delta.of(0.05, priceChange, defaultPrice)) + .reduceGroup(sendWarning _) + + val warningsPerStock = priceWarnings.map(Count(_, 1)) + .groupBy("symbol") + .window(Time.of(30, SECONDS)) + .sum("count") + + //Step 4 + //Read a stream of tweets and extract the stock symbols + + val tweetStream = env.addSource(generateTweets _) + + val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" ")) + .map(_.toUpperCase()) + .filter(symbols.contains(_)) + + val tweetsPerStock = mentionedSymbols.map(Count(_, 1)) + .groupBy("symbol") + .window(Time.of(30, SECONDS)) + .sum("count") + + //Step 5 + //For advanced analysis we join the number of tweets and + //the number of price change warnings by stock + //for the last half minute, we keep only the counts. + //This information is used to compute rolling correlations + //between the tweets and the price changes + + val tweetsAndWarning = warningsPerStock.join(tweetsPerStock) + .onWindow(30, SECONDS) + .where("symbol") + .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) } + + val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS)) + .reduceGroup(computeCorrelation _).setParallelism(1) + + rollingCorrelation.print + + env.execute("Stock stream") + } + + def priceChange(p1: StockPrice, p2: StockPrice): Double = { + Math.abs(p1.price / p2.price - 1) + } + + def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = { + if (ts.nonEmpty) { + out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size)) + } + } + + def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = { + if (ts.nonEmpty) out.collect(ts.head.symbol) + } + + def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = { + if (input.nonEmpty) { + val var1 = input.map(_._1) + val mean1 = average(var1) + val var2 = input.map(_._2) + val mean2 = average(var2) + + val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2))) + val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2)))) + val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2)))) + + out.collect(cov / (d1 * d2)) + } + } + + def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = { + var price = 1000. + while (true) { + price = price + Random.nextGaussian * sigma + out.collect(StockPrice(symbol, price)) + Thread.sleep(Random.nextInt(200)) + } + } + + def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = { + num.toDouble(ts.sum) / ts.size + } + + def generateTweets(out: Collector[String]) = { + while (true) { + val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size))) + out.collect(s.mkString(" ")) + Thread.sleep(Random.nextInt(500)) + } + } + +}
