Repository: flink
Updated Branches:
  refs/heads/master 082d87e51 -> 239168316


[hotfix] Fixes broken TopSpeedWindowing scala example

Integrated PR comments

This closes #2259.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23916831
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23916831
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23916831

Branch: refs/heads/master
Commit: 239168316b4b2999aa574b1a7124c96ddd05063f
Parents: 082d87e
Author: kl0u <[email protected]>
Authored: Fri Jul 15 18:20:04 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Jul 20 14:24:06 2016 +0200

----------------------------------------------------------------------
 .../examples/windowing/TopSpeedWindowing.java   |  8 +---
 .../examples/windowing/TopSpeedWindowing.scala  | 50 ++++++++++++--------
 2 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23916831/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 4e41b73..d159aab 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -37,15 +37,13 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * An example of grouped stream windowing where different eviction and trigger
- * policies can be used. A source fetches events from cars every 1 sec
+ * policies can be used. A source fetches events from cars every 100 msec
  * containing their id, their current speed (kmh), overall elapsed distance (m)
  * and a timestamp. The streaming example triggers the top speed of each car
  * every x meters elapsed for the last y seconds.
  */
 public class TopSpeedWindowing {
 
-       private static final int NUM_CAR_EVENTS = 100;
-
        // 
*************************************************************************
        // PROGRAM
        // 
*************************************************************************
@@ -111,7 +109,6 @@ public class TopSpeedWindowing {
                private Random rand = new Random();
 
                private volatile boolean isRunning = true;
-               private int counter;
 
                private CarSource(int numOfCars) {
                        speeds = new Integer[numOfCars];
@@ -127,7 +124,7 @@ public class TopSpeedWindowing {
                @Override
                public void run(SourceContext<Tuple4<Integer, Integer, Double, 
Long>> ctx) throws Exception {
 
-                       while (isRunning && counter < NUM_CAR_EVENTS) {
+                       while (isRunning) {
                                Thread.sleep(100);
                                for (int carId = 0; carId < speeds.length; 
carId++) {
                                        if (rand.nextBoolean()) {
@@ -139,7 +136,6 @@ public class TopSpeedWindowing {
                                        Tuple4<Integer, Integer, Double, Long> 
record = new Tuple4<>(carId,
                                                        speeds[carId], 
distances[carId], System.currentTimeMillis());
                                        ctx.collect(record);
-                                       counter++;
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/23916831/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 80881cc..d7c7014 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -19,10 +19,13 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 
+import java.beans.Transient
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
@@ -30,15 +33,13 @@ import 
org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
 
-import scala.Stream._
-import scala.math._
 import scala.language.postfixOps
 import scala.util.Random
 
 /**
  * An example of grouped stream windowing where different eviction and 
  * trigger policies can be used. A source fetches events from cars 
- * every 1 sec containing their id, their current speed (kmh),
+ * every 100 msec containing their id, their current speed (kmh),
  * overall elapsed distance (m) and a timestamp. The streaming
  * example triggers the top speed of each car every x meters elapsed 
  * for the last y seconds.
@@ -72,7 +73,32 @@ object TopSpeedWindowing {
       } else {
         println("Executing TopSpeedWindowing example with default inputs data 
set.")
         println("Use --input to specify file input.")
-        env.fromCollection(genCarStream())
+        env.addSource(new SourceFunction[CarEvent]() {
+
+          val speeds = Array.fill[Integer](numOfCars)(50)
+          val distances = Array.fill[Double](numOfCars)(0d)
+          @Transient lazy val rand = new Random()
+
+          var isRunning:Boolean = true
+
+          override def run(ctx: SourceContext[CarEvent]) = {
+            while (isRunning) {
+              Thread.sleep(100)
+
+              for (carId <- 0 until numOfCars) {
+                if (rand.nextBoolean) speeds(carId) = Math.min(100, 
speeds(carId) + 5)
+                else speeds(carId) = Math.max(0, speeds(carId) - 5)
+
+                distances(carId) += speeds(carId) / 3.6d
+                val record = CarEvent(carId, speeds(carId),
+                  distances(carId), System.currentTimeMillis)
+                ctx.collect(record)
+              }
+            }
+          }
+
+          override def cancel(): Unit = isRunning = false
+        })
       }
 
     val topSeed = cars
@@ -103,22 +129,6 @@ object TopSpeedWindowing {
   // USER FUNCTIONS
   // *************************************************************************
 
-  def genCarStream(): Stream[CarEvent] = {
-
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
-    }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
-      Thread.sleep(1000)
-      speeds.append(carStream(speeds.map(nextSpeed)))
-    }
-    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
-  }
-
   def parseMap(line : String): (Int, Int, Double, Long) = {
     val record = line.substring(1, line.length - 1).split(",")
     (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)

Reply via email to