alpinegizmo commented on a change in pull request #36:
URL: https://github.com/apache/flink-training/pull/36#discussion_r700385890
##########
File path:
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -91,32 +91,38 @@ object LongRidesSolution {
new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
}
+ @throws[Exception]
override def processElement(ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide,
Long]#Context,
out: Collector[Long]): Unit = {
- val firstRideEvent = rideState.value()
+ val firstRideEvent: TaxiRide = rideState.value
if (firstRideEvent == null) {
+ // whatever event comes first, remember it
rideState.update(ride)
+
if (ride.isStart) {
+ // we will use this timer to check for rides that have gone on too
long and may
+ // not yet have an END event (or the END event could be missing)
context.timerService.registerEventTimeTimer(getTimerTime(ride))
- } else if (rideTooLong(ride)) {
- out.collect(ride.rideId)
}
} else {
if (ride.isStart) {
- // There's nothing to do but clear the state (which is done below).
+ if (rideTooLong(ride, firstRideEvent)) {
+ out.collect(ride.rideId)
+ }
} else {
- // There may be a timer that hasn't fired yet.
+ // there is probably a timer that hasn't fired yet
context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))
- // It could be that the ride has gone on too long, but the timer
hasn't fired yet.
- if (rideTooLong(ride)) {
+ // it could be that the ride has gone on too long (and the timer
didn't fire)
Review comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]