This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git
commit fd65b110714cbf487435b5dfdf0374550bd0c820 Author: David Anderson <[email protected]> AuthorDate: Thu Sep 2 10:16:33 2021 -0600 [FLINK-23926] replace startTime and endTime with a single eventTime field --- README.md | 6 +- .../exercises/common/datatypes/TaxiFare.java | 4 +- .../exercises/common/datatypes/TaxiRide.java | 41 +++++------- .../common/sources/TaxiRideGenerator.java | 6 +- .../exercises/common/utils/DataGenerator.java | 4 +- .../solutions/hourlytips/HourlyTipsSolution.java | 3 +- .../hourlytips/scala/HourlyTipsSolution.scala | 2 +- long-ride-alerts/DISCUSSION.md | 33 ++++++---- long-ride-alerts/README.md | 7 ++- .../exercises/longrides/LongRidesExercise.java | 2 +- .../longrides/scala/LongRidesExercise.scala | 2 +- .../solutions/longrides/LongRidesSolution.java | 45 +++++++------ .../longrides/scala/LongRidesSolution.scala | 35 +++++++---- .../exercises/longrides/LongRidesTestBase.java | 10 ++- .../exercises/longrides/LongRidesUnitTest.java | 73 ++++++++++++++++++---- .../ridecleansing/RideCleansingTestBase.java | 12 +--- .../ridesandfares/RidesAndFaresTestBase.java | 3 +- .../ridesandfares/RidesAndFaresUnitTest.java | 4 +- 18 files changed, 173 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index 1229f5b..040815e 100644 --- a/README.md +++ b/README.md @@ -153,9 +153,7 @@ rideId : Long // a unique id for each ride taxiId : Long // a unique id for each taxi driverId : Long // a unique id for each driver isStart : Boolean // TRUE for ride start events, FALSE for ride end events -startTime : Instant // the start time of a ride -endTime : Instant // the end time of a ride, - // "1970-01-01 00:00:00" for start events +eventTime : Instant // the timestamp for this event startLon : Float // the longitude of the ride start location startLat : Float // the latitude of the ride start location endLon : Float // the longitude of the ride end location @@ -171,7 +169,7 @@ There is also a related data set containing fare data about those same rides, wi rideId : Long // a unique id for each ride taxiId : Long // a unique id for each taxi driverId : Long // a unique id for each driver -startTime : Instant // the start time of a ride +startTime : Instant // the start time for this ride paymentType : String // CASH or CARD tip : Float // tip for this ride tolls : Float // tolls for this ride diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java index 367477c..2b1a7f9 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java @@ -130,13 +130,13 @@ public class TaxiFare implements Serializable { } /** Gets the fare's start time. */ - public long getEventTime() { + public long getEventTimeMillis() { return startTime.toEpochMilli(); } /** Creates a StreamRecord, using the fare and its timestamp. Used in tests. */ @VisibleForTesting public StreamRecord<TaxiFare> asStreamRecord() { - return new StreamRecord<>(this, this.getEventTime()); + return new StreamRecord<>(this, this.getEventTimeMillis()); } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java index 3613762..c99d367 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java @@ -42,8 +42,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { /** Creates a new TaxiRide with now as start and end time. */ public TaxiRide() { - this.startTime = Instant.now(); - this.endTime = Instant.now(); + this.eventTime = Instant.now(); } /** Invents a TaxiRide. */ @@ -52,8 +51,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { this.rideId = rideId; this.isStart = isStart; - this.startTime = g.startTime(); - this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime(); + this.eventTime = isStart ? g.startTime() : g.endTime(); this.startLon = g.startLon(); this.startLat = g.startLat(); this.endLon = g.endLon(); @@ -67,8 +65,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { public TaxiRide( long rideId, boolean isStart, - Instant startTime, - Instant endTime, + Instant eventTime, float startLon, float startLat, float endLon, @@ -78,8 +75,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { long driverId) { this.rideId = rideId; this.isStart = isStart; - this.startTime = startTime; - this.endTime = endTime; + this.eventTime = eventTime; this.startLon = startLon; this.startLat = startLat; this.endLon = endLon; @@ -91,8 +87,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { public long rideId; public boolean isStart; - public Instant startTime; - public Instant endTime; + public Instant eventTime; public float startLon; public float startLat; public float endLon; @@ -108,9 +103,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { + "," + (isStart ? "START" : "END") + "," - + startTime.toString() - + "," - + endTime.toString() + + eventTime.toString() + "," + startLon + "," @@ -139,7 +132,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { if (other == null) { return 1; } - int compareTimes = Long.compare(this.getEventTime(), other.getEventTime()); + int compareTimes = this.eventTime.compareTo(other.eventTime); if (compareTimes == 0) { if (this.isStart == other.isStart) { return 0; @@ -173,8 +166,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { && passengerCnt == taxiRide.passengerCnt && taxiId == taxiRide.taxiId && driverId == taxiRide.driverId - && Objects.equals(startTime, taxiRide.startTime) - && Objects.equals(endTime, taxiRide.endTime); + && Objects.equals(eventTime, taxiRide.eventTime); } @Override @@ -182,8 +174,7 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { return Objects.hash( rideId, isStart, - startTime, - endTime, + eventTime, startLon, startLat, endLon, @@ -193,13 +184,9 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { driverId); } - /** Gets the ride's time stamp (start or end time depending on {@link #isStart}). */ - public long getEventTime() { - if (isStart) { - return startTime.toEpochMilli(); - } else { - return endTime.toEpochMilli(); - } + /** Gets the ride's time stamp as a long in millis since the epoch. */ + public long getEventTimeMillis() { + return eventTime.toEpochMilli(); } /** Gets the distance from the ride location to the given one. */ @@ -216,12 +203,12 @@ public class TaxiRide implements Comparable<TaxiRide>, Serializable { /** Creates a StreamRecord, using the ride and its timestamp. Used in tests. */ @VisibleForTesting public StreamRecord<TaxiRide> asStreamRecord() { - return new StreamRecord<>(this, this.getEventTime()); + return new StreamRecord<>(this, this.getEventTimeMillis()); } /** Creates a StreamRecord from this taxi ride, using its id and timestamp. Used in tests. */ @VisibleForTesting public StreamRecord<Long> idAsStreamRecord() { - return new StreamRecord<>(this.rideId, this.getEventTime()); + return new StreamRecord<>(this.rideId, this.getEventTimeMillis()); } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java index c5c2108..b3776d9 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java @@ -52,7 +52,7 @@ public class TaxiRideGenerator implements SourceFunction<TaxiRide> { TaxiRide ride = new TaxiRide(id + i, true); startEvents.add(ride); // the start times may be in order, but let's not assume that - maxStartTime = Math.max(maxStartTime, ride.startTime.toEpochMilli()); + maxStartTime = Math.max(maxStartTime, ride.getEventTimeMillis()); } // enqueue the corresponding END events @@ -62,7 +62,7 @@ public class TaxiRideGenerator implements SourceFunction<TaxiRide> { // release the END events coming before the end of this new batch // (this allows a few END events to precede their matching START event) - while (endEventQ.peek().getEventTime() <= maxStartTime) { + while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) { TaxiRide ride = endEventQ.poll(); ctx.collect(ride); } @@ -71,7 +71,7 @@ public class TaxiRideGenerator implements SourceFunction<TaxiRide> { java.util.Collections.shuffle(startEvents, new Random(id)); startEvents .iterator() - .forEachRemaining(r -> ctx.collectWithTimestamp(r, r.getEventTime())); + .forEachRemaining(r -> ctx.collectWithTimestamp(r, r.getEventTimeMillis())); // prepare for the next batch id += BATCH_SIZE; diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java index 24439bd..10c4c2b 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java @@ -25,8 +25,8 @@ import java.util.Random; * Data generator for the fields in TaxiRide and TaxiFare objects. * * <p>Results are deterministically determined by the rideId. This guarantees (among other things) - * that the startTime for a TaxiRide START event matches the startTime for the TaxiRide END and - * TaxiFare events for that same rideId. + * that the eventTime for a TaxiRide START event matches the startTime for the TaxiFare event for + * that same rideId. */ public class DataGenerator { diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java index f61360d..6ae443f 100644 --- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java +++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java @@ -83,7 +83,8 @@ public class HourlyTipsSolution { .assignTimestampsAndWatermarks( // taxi fares are in order WatermarkStrategy.<TaxiFare>forMonotonousTimestamps() - .withTimestampAssigner((fare, t) -> fare.getEventTime())); + .withTimestampAssigner( + (fare, t) -> fare.getEventTimeMillis())); // compute tips per hour for each driver DataStream<Tuple3<Long, Long, Float>> hourlyTips = diff --git a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala index bf3d159..44dd3b1 100644 --- a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala +++ b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala @@ -58,7 +58,7 @@ object HourlyTipsSolution { .forMonotonousTimestamps[TaxiFare]() .withTimestampAssigner(new SerializableTimestampAssigner[TaxiFare] { override def extractTimestamp(fare: TaxiFare, streamRecordTimestamp: Long): Long = - fare.getEventTime + fare.getEventTimeMillis }) // setup the pipeline diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md index d50e390..a91e7fc 100644 --- a/long-ride-alerts/DISCUSSION.md +++ b/long-ride-alerts/DISCUSSION.md @@ -21,24 +21,35 @@ under the License. (Discussion of [Lab: `KeyedProcessFunction` and Timers (Long Ride Alerts)](./)) -Flaws in the reference solutions: +### Analysis -* The reference solutions leak state in the case where a START event is missing. -* In the case where the END event eventually arrives, but after the timer -has fired and has cleared the matching START event, then a duplicate alert is generated. +These cases are worth noting: -A good way to write unit tests for a `KeyedProcessFunction` to check for state retention, etc., is to -use the test harnesses described in the -[documentation on testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators). +* _The START event is missing_. Then END event will sit in state indefinitely (this is a leak!). +* _The END event is missing_. The timer will fire and the state will be cleared (this is ok). +* _The END event arrives after the timer has fired and cleared the state._ In this case the END +event will be stored in state indefinitely (this is another leak!). -These issues could be addressed by keeping some state longer, and then either +These leaks could be addressed by either using [state TTL](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl), or another timer, to eventually clear any lingering state. -But regardless of how long we retain the state, we must eventually clear it, and thereafter we would -still run the risk of extremely late events causing incorrect or duplicated results. +### Bottom line + +Regardless of how clever we are with what state we keep, and how long we choose to keep it, +we should eventually clear it -- because otherwise our state will grow in an unbounded fashion. +And having lost that information, we will run the risk of late events causing incorrect or duplicated results. + This tradeoff between keeping state indefinitely versus occasionally getting things wrong when events are -exceptionally late is a challenge that is inherent to stateful stream processing. + late is a challenge that is inherent to stateful stream processing. + +### If you want to go further + +For each of these, add tests to check for the desired behavior. + +* Extend the solution so that it never leaks state. +* Define what it means for an event to be missing, detect missing START and END events, +and send some notification of this to a side output. ----- diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md index 4613147..b95e261 100644 --- a/long-ride-alerts/README.md +++ b/long-ride-alerts/README.md @@ -27,7 +27,10 @@ This should be done using the event time timestamps and watermarks that are prov The stream is out-of-order, and it is possible that the END event for a ride will be processed before its START event. -A START or END event may be missing, but you may assume there are no duplicated events. +An END event may be missing, but you may assume there are no duplicated events, and no missing START events. + +It is not enough to simply wait for the END event and calculate the duration, as we want to be alerted +about the long ride as soon as possible. You should eventually clear any state you create. @@ -69,8 +72,6 @@ The resulting stream should be printed to standard out. This exercise revolves around using a `KeyedProcessFunction` to manage some state and event time timers, and doing so in a way that works even when the END event for a given `rideId` arrives before the START. The challenge is figuring out what state and timers to use, and when to set and clear the state (and timers). -It is not enough to simply wait for the END event and calculate the duration, as the END event -may be missing. </details> ## Documentation diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java index a492e10..f555e5d 100644 --- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java +++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java @@ -71,7 +71,7 @@ public class LongRidesExercise { WatermarkStrategy<TaxiRide> watermarkStrategy = WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner( - (ride, streamRecordTimestamp) -> ride.getEventTime()); + (ride, streamRecordTimestamp) -> ride.getEventTimeMillis()); // create the pipeline rides.assignTimestampsAndWatermarks(watermarkStrategy) diff --git a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala index 4c9dd27..e8d7979 100644 --- a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala +++ b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala @@ -56,7 +56,7 @@ object LongRidesExercise { .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60)) .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] { override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: Long): Long = - ride.getEventTime + ride.getEventTimeMillis }) // create the pipeline diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java index 4348535..e542817 100644 --- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java +++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java @@ -74,7 +74,7 @@ public class LongRidesSolution { WatermarkStrategy<TaxiRide> watermarkStrategy = WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner( - (ride, streamRecordTimestamp) -> ride.getEventTime()); + (ride, streamRecordTimestamp) -> ride.getEventTimeMillis()); // create the pipeline rides.assignTimestampsAndWatermarks(watermarkStrategy) @@ -105,9 +105,9 @@ public class LongRidesSolution { @Override public void open(Configuration config) { - ValueStateDescriptor<TaxiRide> stateDescriptor = + ValueStateDescriptor<TaxiRide> rideStateDescriptor = new ValueStateDescriptor<>("ride event", TaxiRide.class); - rideState = getRuntimeContext().getState(stateDescriptor); + rideState = getRuntimeContext().getState(rideStateDescriptor); } @Override @@ -117,29 +117,32 @@ public class LongRidesSolution { TaxiRide firstRideEvent = rideState.value(); if (firstRideEvent == null) { + // whatever event comes first, remember it rideState.update(ride); if (ride.isStart) { + // we will use this timer to check for rides that have gone on too long and may + // not yet have an END event (or the END event could be missing) context.timerService().registerEventTimeTimer(getTimerTime(ride)); - } else { - if (rideTooLong(ride)) { - out.collect(ride.rideId); - } } } else { if (ride.isStart) { - // There's nothing to do but clear the state (which is done below). + if (rideTooLong(ride, firstRideEvent)) { + out.collect(ride.rideId); + } } else { - // There may be a timer that hasn't fired yet. + // the first ride was a START event, so there is a timer unless it has fired context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent)); - // It could be that the ride has gone on too long, but the timer hasn't fired - // yet. - if (rideTooLong(ride)) { + // perhaps the ride has gone on too long, but the timer didn't fire yet + if (rideTooLong(firstRideEvent, ride)) { out.collect(ride.rideId); } } - // Both events have now been seen, we can clear the state. + + // both events have now been seen, we can clear the state + // this solution can leak state if an event is missing + // see DISCUSSION.md for more information rideState.clear(); } } @@ -148,19 +151,25 @@ public class LongRidesSolution { public void onTimer(long timestamp, OnTimerContext context, Collector<Long> out) throws Exception { - // The timer only fires if the ride was too long. + // the timer only fires if the ride was too long out.collect(rideState.value().rideId); + + // clearing now prevents duplicate alerts, but will leak state if the END arrives rideState.clear(); } - private boolean rideTooLong(TaxiRide rideEndEvent) { - return Duration.between(rideEndEvent.startTime, rideEndEvent.endTime) + private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) { + return Duration.between(startEvent.eventTime, endEvent.eventTime) .compareTo(Duration.ofHours(2)) > 0; } - private long getTimerTime(TaxiRide ride) { - return ride.startTime.plusSeconds(120 * 60).toEpochMilli(); + private long getTimerTime(TaxiRide ride) throws RuntimeException { + if (ride.isStart) { + return ride.eventTime.plusSeconds(120 * 60).toEpochMilli(); + } else { + throw new RuntimeException("Can not get start time from END event."); + } } } } diff --git a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala index 3e17acc..6bdd345 100644 --- a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala +++ b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala @@ -57,7 +57,7 @@ object LongRidesSolution { .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60)) .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] { override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: Long): Long = - ride.getEventTime + ride.getEventTimeMillis }) // create the pipeline @@ -95,28 +95,35 @@ object LongRidesSolution { out: Collector[Long] ): Unit = { - val firstRideEvent = rideState.value() + val firstRideEvent: TaxiRide = rideState.value if (firstRideEvent == null) { + // whatever event comes first, remember it rideState.update(ride) + if (ride.isStart) { + // we will use this timer to check for rides that have gone on too long and may + // not yet have an END event (or the END event could be missing) context.timerService.registerEventTimeTimer(getTimerTime(ride)) - } else if (rideTooLong(ride)) { - out.collect(ride.rideId) } } else { if (ride.isStart) { - // There's nothing to do but clear the state (which is done below). + if (rideTooLong(ride, firstRideEvent)) { + out.collect(ride.rideId) + } } else { - // There may be a timer that hasn't fired yet. + // the first ride was a START event, so there is a timer unless it has fired context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent)) - // It could be that the ride has gone on too long, but the timer hasn't fired yet. - if (rideTooLong(ride)) { + // perhaps the ride has gone on too long, but the timer didn't fire yet + if (rideTooLong(firstRideEvent, ride)) { out.collect(ride.rideId) } } - // Both events have now been seen, we can clear the state. + + // both events have now been seen, we can clear the state + // this solution can leak state if an event is missing + // see DISCUSSION.md for more information rideState.clear() } } @@ -127,17 +134,19 @@ object LongRidesSolution { out: Collector[Long] ): Unit = { - // The timer only fires if the ride was too long. + // the timer only fires if the ride was too long out.collect(rideState.value().rideId) + + // clearing now prevents duplicate alerts, but will leak state if the END arrives rideState.clear() } - private def rideTooLong(rideEndEvent: TaxiRide) = + private def rideTooLong(startEvent: TaxiRide, endEvent: TaxiRide) = Duration - .between(rideEndEvent.startTime, rideEndEvent.endTime) + .between(startEvent.eventTime, endEvent.eventTime) .compareTo(Duration.ofHours(2)) > 0 - private def getTimerTime(ride: TaxiRide) = ride.startTime.toEpochMilli + 2.hours.toMillis + private def getTimerTime(ride: TaxiRide) = ride.eventTime.toEpochMilli + 2.hours.toMillis } } diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java index 955702d..f05cd93 100644 --- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java +++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java @@ -12,21 +12,19 @@ public class LongRidesTestBase { public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 * 60); public static TaxiRide startRide(long rideId, Instant startTime) { - return testRide(rideId, true, startTime, Instant.EPOCH); + return testRide(rideId, true, startTime); } public static TaxiRide endRide(TaxiRide started, Instant endTime) { - return testRide(started.rideId, false, started.startTime, endTime); + return testRide(started.rideId, false, endTime); } - private static TaxiRide testRide( - long rideId, Boolean isStart, Instant startTime, Instant endTime) { + private static TaxiRide testRide(long rideId, Boolean isStart, Instant eventTime) { return new TaxiRide( rideId, isStart, - startTime, - endTime, + eventTime, -73.9947F, 40.750626F, -73.9947F, diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java index bc76746..4d22b22 100644 --- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java +++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java @@ -39,26 +39,72 @@ public class LongRidesUnitTest extends LongRidesTestBase { } @Test - public void shouldAlertOnLongEndEvent() throws Exception { + public void shouldUseTimersAndState() throws Exception { - TaxiRide rideStartedButNotSent = startRide(1, BEGINNING); - TaxiRide endedThreeHoursLater = endRide(rideStartedButNotSent, THREE_HOURS_LATER); + TaxiRide rideStarted = startRide(1, BEGINNING); + harness.processElement(rideStarted.asStreamRecord()); - harness.processElement(endedThreeHoursLater.asStreamRecord()); - assertThat(harness.getOutput()).containsExactly(endedThreeHoursLater.idAsStreamRecord()); + // check for state and timers + assertThat(harness.numEventTimeTimers()).isGreaterThan(0); + assertThat(harness.numKeyedStateEntries()).isGreaterThan(0); + + TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER); + harness.processElement(endedOneMinuteLater.asStreamRecord()); + + // in this case, state and timers should be gone now + assertThat(harness.numEventTimeTimers()).isZero(); + assertThat(harness.numKeyedStateEntries()).isZero(); + } + + @Test + public void shouldNotAlertWithStartFirst() throws Exception { + + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER); + + harness.processElement(rideStarted.asStreamRecord()); + harness.processElement(endedOneMinuteLater.asStreamRecord()); + + assertThat(harness.getOutput()).isEmpty(); } @Test - public void shouldNotAlertOnShortEndEvent() throws Exception { + public void shouldNotAlertWithEndFirst() throws Exception { - TaxiRide rideStartedButNotSent = startRide(1, BEGINNING); - TaxiRide endedOneMinuteLater = endRide(rideStartedButNotSent, ONE_MINUTE_LATER); + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER); harness.processElement(endedOneMinuteLater.asStreamRecord()); + harness.processElement(rideStarted.asStreamRecord()); + assertThat(harness.getOutput()).isEmpty(); } @Test + public void shouldAlertWithStartFirst() throws Exception { + + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedThreeHoursLater = endRide(rideStarted, THREE_HOURS_LATER); + + harness.processElement(rideStarted.asStreamRecord()); + harness.processElement(endedThreeHoursLater.asStreamRecord()); + + assertThat(resultingRideId()).isEqualTo(rideStarted.rideId); + } + + @Test + public void shouldAlertWithEndFirst() throws Exception { + + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedThreeHoursLater = endRide(rideStarted, THREE_HOURS_LATER); + + harness.processElement(endedThreeHoursLater.asStreamRecord()); + harness.processElement(rideStarted.asStreamRecord()); + + assertThat(resultingRideId()).isEqualTo(rideStarted.rideId); + } + + @Test public void shouldNotAlertWithoutWatermarkOrEndEvent() throws Exception { TaxiRide rideStarted = startRide(1, BEGINNING); @@ -98,10 +144,15 @@ public class LongRidesUnitTest extends LongRidesTestBase { StreamRecord<Long> rideIdAtTimeOfWatermark = new StreamRecord<>(startOfLongRide.rideId, mark2HoursLater.getTimestamp()); assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, mark2HoursLater); + } - // Check that no state or timers are left behind - assertThat(harness.numKeyedStateEntries()).isZero(); - assertThat(harness.numEventTimeTimers()).isZero(); + private Long resultingRideId() { + ConcurrentLinkedQueue<Object> results = harness.getOutput(); + assertThat(results.size()) + .withFailMessage("Expecting test to have exactly one result") + .isEqualTo(1); + StreamRecord<Long> resultingRecord = (StreamRecord<Long>) results.element(); + return resultingRecord.getValue(); } private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> setupHarness( diff --git a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java index 62ff719..cbfeeda 100644 --- a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java +++ b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java @@ -8,16 +8,6 @@ public class RideCleansingTestBase { public static TaxiRide testRide(float startLon, float startLat, float endLon, float endLat) { return new TaxiRide( - 1L, - true, - Instant.EPOCH, - Instant.EPOCH, - startLon, - startLat, - endLon, - endLat, - (short) 1, - 0, - 0); + 1L, true, Instant.EPOCH, startLon, startLat, endLon, endLat, (short) 1, 0, 0); } } diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java index c634bd4..e84c265 100644 --- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java +++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java @@ -8,8 +8,7 @@ import java.time.Instant; public class RidesAndFaresTestBase { public static TaxiRide testRide(long rideId) { - return new TaxiRide( - rideId, true, Instant.EPOCH, Instant.EPOCH, 0F, 0F, 0F, 0F, (short) 1, 0, rideId); + return new TaxiRide(rideId, true, Instant.EPOCH, 0F, 0F, 0F, 0F, (short) 1, 0, rideId); } public static TaxiFare testFare(long rideId) { diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java index 93b0167..2dca786 100644 --- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java +++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java @@ -53,7 +53,7 @@ public class RidesAndFaresUnitTest extends RidesAndFaresTestBase { // Verify the result StreamRecord<RideAndFare> expected = - new StreamRecord<>(new RideAndFare(ride1, fare1), ride1.getEventTime()); + new StreamRecord<>(new RideAndFare(ride1, fare1), ride1.getEventTimeMillis()); assertThat(harness.getOutput()).containsExactly(expected); } @@ -70,7 +70,7 @@ public class RidesAndFaresUnitTest extends RidesAndFaresTestBase { // Verify the result StreamRecord<RideAndFare> expected = - new StreamRecord<>(new RideAndFare(ride1, fare1), ride1.getEventTime()); + new StreamRecord<>(new RideAndFare(ride1, fare1), ride1.getEventTimeMillis()); assertThat(harness.getOutput()).containsExactly(expected); }
