This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit fd65b110714cbf487435b5dfdf0374550bd0c820
Author: David Anderson <[email protected]>
AuthorDate: Thu Sep 2 10:16:33 2021 -0600

    [FLINK-23926] replace startTime and endTime with a single eventTime field
---
 README.md                                          |  6 +-
 .../exercises/common/datatypes/TaxiFare.java       |  4 +-
 .../exercises/common/datatypes/TaxiRide.java       | 41 +++++-------
 .../common/sources/TaxiRideGenerator.java          |  6 +-
 .../exercises/common/utils/DataGenerator.java      |  4 +-
 .../solutions/hourlytips/HourlyTipsSolution.java   |  3 +-
 .../hourlytips/scala/HourlyTipsSolution.scala      |  2 +-
 long-ride-alerts/DISCUSSION.md                     | 33 ++++++----
 long-ride-alerts/README.md                         |  7 ++-
 .../exercises/longrides/LongRidesExercise.java     |  2 +-
 .../longrides/scala/LongRidesExercise.scala        |  2 +-
 .../solutions/longrides/LongRidesSolution.java     | 45 +++++++------
 .../longrides/scala/LongRidesSolution.scala        | 35 +++++++----
 .../exercises/longrides/LongRidesTestBase.java     | 10 ++-
 .../exercises/longrides/LongRidesUnitTest.java     | 73 ++++++++++++++++++----
 .../ridecleansing/RideCleansingTestBase.java       | 12 +---
 .../ridesandfares/RidesAndFaresTestBase.java       |  3 +-
 .../ridesandfares/RidesAndFaresUnitTest.java       |  4 +-
 18 files changed, 173 insertions(+), 119 deletions(-)

diff --git a/README.md b/README.md
index 1229f5b..040815e 100644
--- a/README.md
+++ b/README.md
@@ -153,9 +153,7 @@ rideId         : Long      // a unique id for each ride
 taxiId         : Long      // a unique id for each taxi
 driverId       : Long      // a unique id for each driver
 isStart        : Boolean   // TRUE for ride start events, FALSE for ride end 
events
-startTime      : Instant   // the start time of a ride
-endTime        : Instant   // the end time of a ride,
-                           //   "1970-01-01 00:00:00" for start events
+eventTime      : Instant   // the timestamp for this event
 startLon       : Float     // the longitude of the ride start location
 startLat       : Float     // the latitude of the ride start location
 endLon         : Float     // the longitude of the ride end location
@@ -171,7 +169,7 @@ There is also a related data set containing fare data about 
those same rides, wi
 rideId         : Long      // a unique id for each ride
 taxiId         : Long      // a unique id for each taxi
 driverId       : Long      // a unique id for each driver
-startTime      : Instant   // the start time of a ride
+startTime      : Instant   // the start time for this ride
 paymentType    : String    // CASH or CARD
 tip            : Float     // tip for this ride
 tolls          : Float     // tolls for this ride
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index 367477c..2b1a7f9 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -130,13 +130,13 @@ public class TaxiFare implements Serializable {
     }
 
     /** Gets the fare's start time. */
-    public long getEventTime() {
+    public long getEventTimeMillis() {
         return startTime.toEpochMilli();
     }
 
     /** Creates a StreamRecord, using the fare and its timestamp. Used in 
tests. */
     @VisibleForTesting
     public StreamRecord<TaxiFare> asStreamRecord() {
-        return new StreamRecord<>(this, this.getEventTime());
+        return new StreamRecord<>(this, this.getEventTimeMillis());
     }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index 3613762..c99d367 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -42,8 +42,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
 
     /** Creates a new TaxiRide with now as start and end time. */
     public TaxiRide() {
-        this.startTime = Instant.now();
-        this.endTime = Instant.now();
+        this.eventTime = Instant.now();
     }
 
     /** Invents a TaxiRide. */
@@ -52,8 +51,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
 
         this.rideId = rideId;
         this.isStart = isStart;
-        this.startTime = g.startTime();
-        this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime();
+        this.eventTime = isStart ? g.startTime() : g.endTime();
         this.startLon = g.startLon();
         this.startLat = g.startLat();
         this.endLon = g.endLon();
@@ -67,8 +65,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
     public TaxiRide(
             long rideId,
             boolean isStart,
-            Instant startTime,
-            Instant endTime,
+            Instant eventTime,
             float startLon,
             float startLat,
             float endLon,
@@ -78,8 +75,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
             long driverId) {
         this.rideId = rideId;
         this.isStart = isStart;
-        this.startTime = startTime;
-        this.endTime = endTime;
+        this.eventTime = eventTime;
         this.startLon = startLon;
         this.startLat = startLat;
         this.endLon = endLon;
@@ -91,8 +87,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
 
     public long rideId;
     public boolean isStart;
-    public Instant startTime;
-    public Instant endTime;
+    public Instant eventTime;
     public float startLon;
     public float startLat;
     public float endLon;
@@ -108,9 +103,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
                 + ","
                 + (isStart ? "START" : "END")
                 + ","
-                + startTime.toString()
-                + ","
-                + endTime.toString()
+                + eventTime.toString()
                 + ","
                 + startLon
                 + ","
@@ -139,7 +132,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
         if (other == null) {
             return 1;
         }
-        int compareTimes = Long.compare(this.getEventTime(), 
other.getEventTime());
+        int compareTimes = this.eventTime.compareTo(other.eventTime);
         if (compareTimes == 0) {
             if (this.isStart == other.isStart) {
                 return 0;
@@ -173,8 +166,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
                 && passengerCnt == taxiRide.passengerCnt
                 && taxiId == taxiRide.taxiId
                 && driverId == taxiRide.driverId
-                && Objects.equals(startTime, taxiRide.startTime)
-                && Objects.equals(endTime, taxiRide.endTime);
+                && Objects.equals(eventTime, taxiRide.eventTime);
     }
 
     @Override
@@ -182,8 +174,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
         return Objects.hash(
                 rideId,
                 isStart,
-                startTime,
-                endTime,
+                eventTime,
                 startLon,
                 startLat,
                 endLon,
@@ -193,13 +184,9 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
                 driverId);
     }
 
-    /** Gets the ride's time stamp (start or end time depending on {@link 
#isStart}). */
-    public long getEventTime() {
-        if (isStart) {
-            return startTime.toEpochMilli();
-        } else {
-            return endTime.toEpochMilli();
-        }
+    /** Gets the ride's time stamp as a long in millis since the epoch. */
+    public long getEventTimeMillis() {
+        return eventTime.toEpochMilli();
     }
 
     /** Gets the distance from the ride location to the given one. */
@@ -216,12 +203,12 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
     /** Creates a StreamRecord, using the ride and its timestamp. Used in 
tests. */
     @VisibleForTesting
     public StreamRecord<TaxiRide> asStreamRecord() {
-        return new StreamRecord<>(this, this.getEventTime());
+        return new StreamRecord<>(this, this.getEventTimeMillis());
     }
 
     /** Creates a StreamRecord from this taxi ride, using its id and 
timestamp. Used in tests. */
     @VisibleForTesting
     public StreamRecord<Long> idAsStreamRecord() {
-        return new StreamRecord<>(this.rideId, this.getEventTime());
+        return new StreamRecord<>(this.rideId, this.getEventTimeMillis());
     }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
index c5c2108..b3776d9 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
@@ -52,7 +52,7 @@ public class TaxiRideGenerator implements 
SourceFunction<TaxiRide> {
                 TaxiRide ride = new TaxiRide(id + i, true);
                 startEvents.add(ride);
                 // the start times may be in order, but let's not assume that
-                maxStartTime = Math.max(maxStartTime, 
ride.startTime.toEpochMilli());
+                maxStartTime = Math.max(maxStartTime, 
ride.getEventTimeMillis());
             }
 
             // enqueue the corresponding END events
@@ -62,7 +62,7 @@ public class TaxiRideGenerator implements 
SourceFunction<TaxiRide> {
 
             // release the END events coming before the end of this new batch
             // (this allows a few END events to precede their matching START 
event)
-            while (endEventQ.peek().getEventTime() <= maxStartTime) {
+            while (endEventQ.peek().getEventTimeMillis() <= maxStartTime) {
                 TaxiRide ride = endEventQ.poll();
                 ctx.collect(ride);
             }
@@ -71,7 +71,7 @@ public class TaxiRideGenerator implements 
SourceFunction<TaxiRide> {
             java.util.Collections.shuffle(startEvents, new Random(id));
             startEvents
                     .iterator()
-                    .forEachRemaining(r -> ctx.collectWithTimestamp(r, 
r.getEventTime()));
+                    .forEachRemaining(r -> ctx.collectWithTimestamp(r, 
r.getEventTimeMillis()));
 
             // prepare for the next batch
             id += BATCH_SIZE;
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
index 24439bd..10c4c2b 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
@@ -25,8 +25,8 @@ import java.util.Random;
  * Data generator for the fields in TaxiRide and TaxiFare objects.
  *
  * <p>Results are deterministically determined by the rideId. This guarantees 
(among other things)
- * that the startTime for a TaxiRide START event matches the startTime for the 
TaxiRide END and
- * TaxiFare events for that same rideId.
+ * that the eventTime for a TaxiRide START event matches the startTime for the 
TaxiFare event for
+ * that same rideId.
  */
 public class DataGenerator {
 
diff --git 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index f61360d..6ae443f 100644
--- 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -83,7 +83,8 @@ public class HourlyTipsSolution {
                         .assignTimestampsAndWatermarks(
                                 // taxi fares are in order
                                 
WatermarkStrategy.<TaxiFare>forMonotonousTimestamps()
-                                        .withTimestampAssigner((fare, t) -> 
fare.getEventTime()));
+                                        .withTimestampAssigner(
+                                                (fare, t) -> 
fare.getEventTimeMillis()));
 
         // compute tips per hour for each driver
         DataStream<Tuple3<Long, Long, Float>> hourlyTips =
diff --git 
a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
 
b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
index bf3d159..44dd3b1 100644
--- 
a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
+++ 
b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
@@ -58,7 +58,7 @@ object HourlyTipsSolution {
         .forMonotonousTimestamps[TaxiFare]()
         .withTimestampAssigner(new SerializableTimestampAssigner[TaxiFare] {
           override def extractTimestamp(fare: TaxiFare, streamRecordTimestamp: 
Long): Long =
-            fare.getEventTime
+            fare.getEventTimeMillis
         })
 
       // setup the pipeline
diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md
index d50e390..a91e7fc 100644
--- a/long-ride-alerts/DISCUSSION.md
+++ b/long-ride-alerts/DISCUSSION.md
@@ -21,24 +21,35 @@ under the License.
 
 (Discussion of [Lab: `KeyedProcessFunction` and Timers (Long Ride Alerts)](./))
 
-Flaws in the reference solutions:
+### Analysis
 
-* The reference solutions leak state in the case where a START event is 
missing.
-* In the case where the END event eventually arrives, but after the timer
-has fired and has cleared the matching START event, then a duplicate alert is 
generated.
+These cases are worth noting:
 
-A good way to write unit tests for a `KeyedProcessFunction` to check for state 
retention, etc., is to
-use the test harnesses described in the
-[documentation on 
testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators).
+* _The START event is missing_. Then END event will sit in state indefinitely 
(this is a leak!).
+* _The END event is missing_. The timer will fire and the state will be 
cleared (this is ok).
+* _The END event arrives after the timer has fired and cleared the state._ In 
this case the END
+event will be stored in state indefinitely (this is another leak!).
 
-These issues could be addressed by keeping some state longer, and then either
+These leaks could be addressed by either
 using [state 
TTL](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl),
 or another timer, to eventually clear any lingering state.
 
-But regardless of how long we retain the state, we must eventually clear it, 
and thereafter we would
-still run the risk of extremely late events causing incorrect or duplicated 
results.
+### Bottom line
+
+Regardless of how clever we are with what state we keep, and how long we 
choose to keep it,
+we should eventually clear it -- because otherwise our state will grow in an 
unbounded fashion.
+And having lost that information, we will run the risk of late events causing 
incorrect or duplicated results.
+
 This tradeoff between keeping state indefinitely versus occasionally getting 
things wrong when events are
-exceptionally late is a challenge that is inherent to stateful stream 
processing.
+ late is a challenge that is inherent to stateful stream processing.
+
+### If you want to go further
+
+For each of these, add tests to check for the desired behavior.
+
+* Extend the solution so that it never leaks state.
+* Define what it means for an event to be missing, detect missing START and 
END events,
+and send some notification of this to a side output.
 
 -----
 
diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md
index 4613147..b95e261 100644
--- a/long-ride-alerts/README.md
+++ b/long-ride-alerts/README.md
@@ -27,7 +27,10 @@ This should be done using the event time timestamps and 
watermarks that are prov
 The stream is out-of-order, and it is possible that the END event for a ride 
will be processed before
 its START event.
 
-A START or END event may be missing, but you may assume there are no 
duplicated events.
+An END event may be missing, but you may assume there are no duplicated 
events, and no missing START events.
+
+It is not enough to simply wait for the END event and calculate the duration, 
as we want to be alerted
+about the long ride as soon as possible.
 
 You should eventually clear any state you create.
 
@@ -69,8 +72,6 @@ The resulting stream should be printed to standard out.
 This exercise revolves around using a `KeyedProcessFunction` to manage some 
state and event time timers,
 and doing so in a way that works even when the END event for a given `rideId` 
arrives before the START.
 The challenge is figuring out what state and timers to use, and when to set 
and clear the state (and timers).
-It is not enough to simply wait for the END event and calculate the duration, 
as the END event
-may be missing.
 </details>
 
 ## Documentation
diff --git 
a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
 
b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index a492e10..f555e5d 100644
--- 
a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ 
b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -71,7 +71,7 @@ public class LongRidesExercise {
         WatermarkStrategy<TaxiRide> watermarkStrategy =
                 
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                         .withTimestampAssigner(
-                                (ride, streamRecordTimestamp) -> 
ride.getEventTime());
+                                (ride, streamRecordTimestamp) -> 
ride.getEventTimeMillis());
 
         // create the pipeline
         rides.assignTimestampsAndWatermarks(watermarkStrategy)
diff --git 
a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
 
b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
index 4c9dd27..e8d7979 100644
--- 
a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
+++ 
b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
@@ -56,7 +56,7 @@ object LongRidesExercise {
         .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
         .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
           override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
-            ride.getEventTime
+            ride.getEventTimeMillis
         })
 
       // create the pipeline
diff --git 
a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
 
b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index 4348535..e542817 100644
--- 
a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ 
b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -74,7 +74,7 @@ public class LongRidesSolution {
         WatermarkStrategy<TaxiRide> watermarkStrategy =
                 
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                         .withTimestampAssigner(
-                                (ride, streamRecordTimestamp) -> 
ride.getEventTime());
+                                (ride, streamRecordTimestamp) -> 
ride.getEventTimeMillis());
 
         // create the pipeline
         rides.assignTimestampsAndWatermarks(watermarkStrategy)
@@ -105,9 +105,9 @@ public class LongRidesSolution {
 
         @Override
         public void open(Configuration config) {
-            ValueStateDescriptor<TaxiRide> stateDescriptor =
+            ValueStateDescriptor<TaxiRide> rideStateDescriptor =
                     new ValueStateDescriptor<>("ride event", TaxiRide.class);
-            rideState = getRuntimeContext().getState(stateDescriptor);
+            rideState = getRuntimeContext().getState(rideStateDescriptor);
         }
 
         @Override
@@ -117,29 +117,32 @@ public class LongRidesSolution {
             TaxiRide firstRideEvent = rideState.value();
 
             if (firstRideEvent == null) {
+                // whatever event comes first, remember it
                 rideState.update(ride);
 
                 if (ride.isStart) {
+                    // we will use this timer to check for rides that have 
gone on too long and may
+                    // not yet have an END event (or the END event could be 
missing)
                     
context.timerService().registerEventTimeTimer(getTimerTime(ride));
-                } else {
-                    if (rideTooLong(ride)) {
-                        out.collect(ride.rideId);
-                    }
                 }
             } else {
                 if (ride.isStart) {
-                    // There's nothing to do but clear the state (which is 
done below).
+                    if (rideTooLong(ride, firstRideEvent)) {
+                        out.collect(ride.rideId);
+                    }
                 } else {
-                    // There may be a timer that hasn't fired yet.
+                    // the first ride was a START event, so there is a timer 
unless it has fired
                     
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
 
-                    // It could be that the ride has gone on too long, but the 
timer hasn't fired
-                    // yet.
-                    if (rideTooLong(ride)) {
+                    // perhaps the ride has gone on too long, but the timer 
didn't fire yet
+                    if (rideTooLong(firstRideEvent, ride)) {
                         out.collect(ride.rideId);
                     }
                 }
-                // Both events have now been seen, we can clear the state.
+
+                // both events have now been seen, we can clear the state
+                // this solution can leak state if an event is missing
+                // see DISCUSSION.md for more information
                 rideState.clear();
             }
         }
@@ -148,19 +151,25 @@ public class LongRidesSolution {
         public void onTimer(long timestamp, OnTimerContext context, 
Collector<Long> out)
                 throws Exception {
 
-            // The timer only fires if the ride was too long.
+            // the timer only fires if the ride was too long
             out.collect(rideState.value().rideId);
+
+            // clearing now prevents duplicate alerts, but will leak state if 
the END arrives
             rideState.clear();
         }
 
-        private boolean rideTooLong(TaxiRide rideEndEvent) {
-            return Duration.between(rideEndEvent.startTime, 
rideEndEvent.endTime)
+        private boolean rideTooLong(TaxiRide startEvent, TaxiRide endEvent) {
+            return Duration.between(startEvent.eventTime, endEvent.eventTime)
                             .compareTo(Duration.ofHours(2))
                     > 0;
         }
 
-        private long getTimerTime(TaxiRide ride) {
-            return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
+        private long getTimerTime(TaxiRide ride) throws RuntimeException {
+            if (ride.isStart) {
+                return ride.eventTime.plusSeconds(120 * 60).toEpochMilli();
+            } else {
+                throw new RuntimeException("Can not get start time from END 
event.");
+            }
         }
     }
 }
diff --git 
a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
 
b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
index 3e17acc..6bdd345 100644
--- 
a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
+++ 
b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
@@ -57,7 +57,7 @@ object LongRidesSolution {
         .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
         .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
           override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
-            ride.getEventTime
+            ride.getEventTimeMillis
         })
 
       // create the pipeline
@@ -95,28 +95,35 @@ object LongRidesSolution {
         out: Collector[Long]
     ): Unit = {
 
-      val firstRideEvent = rideState.value()
+      val firstRideEvent: TaxiRide = rideState.value
 
       if (firstRideEvent == null) {
+        // whatever event comes first, remember it
         rideState.update(ride)
+
         if (ride.isStart) {
+          // we will use this timer to check for rides that have gone on too 
long and may
+          // not yet have an END event (or the END event could be missing)
           context.timerService.registerEventTimeTimer(getTimerTime(ride))
-        } else if (rideTooLong(ride)) {
-          out.collect(ride.rideId)
         }
       } else {
         if (ride.isStart) {
-          // There's nothing to do but clear the state (which is done below).
+          if (rideTooLong(ride, firstRideEvent)) {
+            out.collect(ride.rideId)
+          }
         } else {
-          // There may be a timer that hasn't fired yet.
+          // the first ride was a START event, so there is a timer unless it 
has fired
           
context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))
 
-          // It could be that the ride has gone on too long, but the timer 
hasn't fired yet.
-          if (rideTooLong(ride)) {
+          // perhaps the ride has gone on too long, but the timer didn't fire 
yet
+          if (rideTooLong(firstRideEvent, ride)) {
             out.collect(ride.rideId)
           }
         }
-        // Both events have now been seen, we can clear the state.
+
+        // both events have now been seen, we can clear the state
+        // this solution can leak state if an event is missing
+        // see DISCUSSION.md for more information
         rideState.clear()
       }
     }
@@ -127,17 +134,19 @@ object LongRidesSolution {
         out: Collector[Long]
     ): Unit = {
 
-      // The timer only fires if the ride was too long.
+      // the timer only fires if the ride was too long
       out.collect(rideState.value().rideId)
+
+      // clearing now prevents duplicate alerts, but will leak state if the 
END arrives
       rideState.clear()
     }
 
-    private def rideTooLong(rideEndEvent: TaxiRide) =
+    private def rideTooLong(startEvent: TaxiRide, endEvent: TaxiRide) =
       Duration
-        .between(rideEndEvent.startTime, rideEndEvent.endTime)
+        .between(startEvent.eventTime, endEvent.eventTime)
         .compareTo(Duration.ofHours(2)) > 0
 
-    private def getTimerTime(ride: TaxiRide) = ride.startTime.toEpochMilli + 
2.hours.toMillis
+    private def getTimerTime(ride: TaxiRide) = ride.eventTime.toEpochMilli + 
2.hours.toMillis
   }
 
 }
diff --git 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
index 955702d..f05cd93 100644
--- 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
+++ 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
@@ -12,21 +12,19 @@ public class LongRidesTestBase {
     public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     public static TaxiRide startRide(long rideId, Instant startTime) {
-        return testRide(rideId, true, startTime, Instant.EPOCH);
+        return testRide(rideId, true, startTime);
     }
 
     public static TaxiRide endRide(TaxiRide started, Instant endTime) {
-        return testRide(started.rideId, false, started.startTime, endTime);
+        return testRide(started.rideId, false, endTime);
     }
 
-    private static TaxiRide testRide(
-            long rideId, Boolean isStart, Instant startTime, Instant endTime) {
+    private static TaxiRide testRide(long rideId, Boolean isStart, Instant 
eventTime) {
 
         return new TaxiRide(
                 rideId,
                 isStart,
-                startTime,
-                endTime,
+                eventTime,
                 -73.9947F,
                 40.750626F,
                 -73.9947F,
diff --git 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
index bc76746..4d22b22 100644
--- 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
+++ 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
@@ -39,26 +39,72 @@ public class LongRidesUnitTest extends LongRidesTestBase {
     }
 
     @Test
-    public void shouldAlertOnLongEndEvent() throws Exception {
+    public void shouldUseTimersAndState() throws Exception {
 
-        TaxiRide rideStartedButNotSent = startRide(1, BEGINNING);
-        TaxiRide endedThreeHoursLater = endRide(rideStartedButNotSent, 
THREE_HOURS_LATER);
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        harness.processElement(rideStarted.asStreamRecord());
 
-        harness.processElement(endedThreeHoursLater.asStreamRecord());
-        
assertThat(harness.getOutput()).containsExactly(endedThreeHoursLater.idAsStreamRecord());
+        // check for state and timers
+        assertThat(harness.numEventTimeTimers()).isGreaterThan(0);
+        assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
+
+        TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
+        harness.processElement(endedOneMinuteLater.asStreamRecord());
+
+        // in this case, state and timers should be gone now
+        assertThat(harness.numEventTimeTimers()).isZero();
+        assertThat(harness.numKeyedStateEntries()).isZero();
+    }
+
+    @Test
+    public void shouldNotAlertWithStartFirst() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        harness.processElement(rideStarted.asStreamRecord());
+        harness.processElement(endedOneMinuteLater.asStreamRecord());
+
+        assertThat(harness.getOutput()).isEmpty();
     }
 
     @Test
-    public void shouldNotAlertOnShortEndEvent() throws Exception {
+    public void shouldNotAlertWithEndFirst() throws Exception {
 
-        TaxiRide rideStartedButNotSent = startRide(1, BEGINNING);
-        TaxiRide endedOneMinuteLater = endRide(rideStartedButNotSent, 
ONE_MINUTE_LATER);
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
         harness.processElement(endedOneMinuteLater.asStreamRecord());
+        harness.processElement(rideStarted.asStreamRecord());
+
         assertThat(harness.getOutput()).isEmpty();
     }
 
     @Test
+    public void shouldAlertWithStartFirst() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        harness.processElement(rideStarted.asStreamRecord());
+        harness.processElement(endedThreeHoursLater.asStreamRecord());
+
+        assertThat(resultingRideId()).isEqualTo(rideStarted.rideId);
+    }
+
+    @Test
+    public void shouldAlertWithEndFirst() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        harness.processElement(endedThreeHoursLater.asStreamRecord());
+        harness.processElement(rideStarted.asStreamRecord());
+
+        assertThat(resultingRideId()).isEqualTo(rideStarted.rideId);
+    }
+
+    @Test
     public void shouldNotAlertWithoutWatermarkOrEndEvent() throws Exception {
 
         TaxiRide rideStarted = startRide(1, BEGINNING);
@@ -98,10 +144,15 @@ public class LongRidesUnitTest extends LongRidesTestBase {
         StreamRecord<Long> rideIdAtTimeOfWatermark =
                 new StreamRecord<>(startOfLongRide.rideId, 
mark2HoursLater.getTimestamp());
         
assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, 
mark2HoursLater);
+    }
 
-        // Check that no state or timers are left behind
-        assertThat(harness.numKeyedStateEntries()).isZero();
-        assertThat(harness.numEventTimeTimers()).isZero();
+    private Long resultingRideId() {
+        ConcurrentLinkedQueue<Object> results = harness.getOutput();
+        assertThat(results.size())
+                .withFailMessage("Expecting test to have exactly one result")
+                .isEqualTo(1);
+        StreamRecord<Long> resultingRecord = (StreamRecord<Long>) 
results.element();
+        return resultingRecord.getValue();
     }
 
     private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
setupHarness(
diff --git 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java
 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java
index 62ff719..cbfeeda 100644
--- 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java
+++ 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTestBase.java
@@ -8,16 +8,6 @@ public class RideCleansingTestBase {
 
     public static TaxiRide testRide(float startLon, float startLat, float 
endLon, float endLat) {
         return new TaxiRide(
-                1L,
-                true,
-                Instant.EPOCH,
-                Instant.EPOCH,
-                startLon,
-                startLat,
-                endLon,
-                endLat,
-                (short) 1,
-                0,
-                0);
+                1L, true, Instant.EPOCH, startLon, startLat, endLon, endLat, 
(short) 1, 0, 0);
     }
 }
diff --git 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java
 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java
index c634bd4..e84c265 100644
--- 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java
+++ 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTestBase.java
@@ -8,8 +8,7 @@ import java.time.Instant;
 public class RidesAndFaresTestBase {
 
     public static TaxiRide testRide(long rideId) {
-        return new TaxiRide(
-                rideId, true, Instant.EPOCH, Instant.EPOCH, 0F, 0F, 0F, 0F, 
(short) 1, 0, rideId);
+        return new TaxiRide(rideId, true, Instant.EPOCH, 0F, 0F, 0F, 0F, 
(short) 1, 0, rideId);
     }
 
     public static TaxiFare testFare(long rideId) {
diff --git 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
index 93b0167..2dca786 100644
--- 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
+++ 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
@@ -53,7 +53,7 @@ public class RidesAndFaresUnitTest extends 
RidesAndFaresTestBase {
 
         // Verify the result
         StreamRecord<RideAndFare> expected =
-                new StreamRecord<>(new RideAndFare(ride1, fare1), 
ride1.getEventTime());
+                new StreamRecord<>(new RideAndFare(ride1, fare1), 
ride1.getEventTimeMillis());
         assertThat(harness.getOutput()).containsExactly(expected);
     }
 
@@ -70,7 +70,7 @@ public class RidesAndFaresUnitTest extends 
RidesAndFaresTestBase {
 
         // Verify the result
         StreamRecord<RideAndFare> expected =
-                new StreamRecord<>(new RideAndFare(ride1, fare1), 
ride1.getEventTime());
+                new StreamRecord<>(new RideAndFare(ride1, fare1), 
ride1.getEventTimeMillis());
         assertThat(harness.getOutput()).containsExactly(expected);
     }
 

Reply via email to