Repository: flink Updated Branches: refs/heads/master 082d87e51 -> 239168316
[hotfix] Fixes broken TopSpeedWindowing scala example Integrated PR comments This closes #2259. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23916831 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23916831 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23916831 Branch: refs/heads/master Commit: 239168316b4b2999aa574b1a7124c96ddd05063f Parents: 082d87e Author: kl0u <[email protected]> Authored: Fri Jul 15 18:20:04 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed Jul 20 14:24:06 2016 +0200 ---------------------------------------------------------------------- .../examples/windowing/TopSpeedWindowing.java | 8 +--- .../examples/windowing/TopSpeedWindowing.scala | 50 ++++++++++++-------- 2 files changed, 32 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/23916831/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index 4e41b73..d159aab 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -37,15 +37,13 @@ import java.util.concurrent.TimeUnit; /** * An example of grouped stream windowing where different eviction and trigger - * policies can be used. A source fetches events from cars every 1 sec + * policies can be used. A source fetches events from cars every 100 msec * containing their id, their current speed (kmh), overall elapsed distance (m) * and a timestamp. The streaming example triggers the top speed of each car * every x meters elapsed for the last y seconds. */ public class TopSpeedWindowing { - private static final int NUM_CAR_EVENTS = 100; - // ************************************************************************* // PROGRAM // ************************************************************************* @@ -111,7 +109,6 @@ public class TopSpeedWindowing { private Random rand = new Random(); private volatile boolean isRunning = true; - private int counter; private CarSource(int numOfCars) { speeds = new Integer[numOfCars]; @@ -127,7 +124,7 @@ public class TopSpeedWindowing { @Override public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx) throws Exception { - while (isRunning && counter < NUM_CAR_EVENTS) { + while (isRunning) { Thread.sleep(100); for (int carId = 0; carId < speeds.length; carId++) { if (rand.nextBoolean()) { @@ -139,7 +136,6 @@ public class TopSpeedWindowing { Tuple4<Integer, Integer, Double, Long> record = new Tuple4<>(carId, speeds[carId], distances[carId], System.currentTimeMillis()); ctx.collect(record); - counter++; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/23916831/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala index 80881cc..d7c7014 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala @@ -19,10 +19,13 @@ package org.apache.flink.streaming.scala.examples.windowing +import java.beans.Transient import java.util.concurrent.TimeUnit import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows @@ -30,15 +33,13 @@ import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger -import scala.Stream._ -import scala.math._ import scala.language.postfixOps import scala.util.Random /** * An example of grouped stream windowing where different eviction and * trigger policies can be used. A source fetches events from cars - * every 1 sec containing their id, their current speed (kmh), + * every 100 msec containing their id, their current speed (kmh), * overall elapsed distance (m) and a timestamp. The streaming * example triggers the top speed of each car every x meters elapsed * for the last y seconds. @@ -72,7 +73,32 @@ object TopSpeedWindowing { } else { println("Executing TopSpeedWindowing example with default inputs data set.") println("Use --input to specify file input.") - env.fromCollection(genCarStream()) + env.addSource(new SourceFunction[CarEvent]() { + + val speeds = Array.fill[Integer](numOfCars)(50) + val distances = Array.fill[Double](numOfCars)(0d) + @Transient lazy val rand = new Random() + + var isRunning:Boolean = true + + override def run(ctx: SourceContext[CarEvent]) = { + while (isRunning) { + Thread.sleep(100) + + for (carId <- 0 until numOfCars) { + if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5) + else speeds(carId) = Math.max(0, speeds(carId) - 5) + + distances(carId) += speeds(carId) / 3.6d + val record = CarEvent(carId, speeds(carId), + distances(carId), System.currentTimeMillis) + ctx.collect(record) + } + } + } + + override def cancel(): Unit = isRunning = false + }) } val topSeed = cars @@ -103,22 +129,6 @@ object TopSpeedWindowing { // USER FUNCTIONS // ************************************************************************* - def genCarStream(): Stream[CarEvent] = { - - def nextSpeed(carEvent : CarEvent) : CarEvent = - { - val next = - if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5) - CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis) - } - def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = - { - Thread.sleep(1000) - speeds.append(carStream(speeds.map(nextSpeed))) - } - carStream(range(0, numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis()))) - } - def parseMap(line : String): (Int, Int, Double, Long) = { val record = line.substring(1, line.length - 1).split(",") (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
