NicoK commented on a change in pull request #36:
URL: https://github.com/apache/flink-training/pull/36#discussion_r700013921



##########
File path: 
common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
##########
@@ -71,7 +71,7 @@ public void run(SourceContext<TaxiRide> ctx) throws Exception 
{
             java.util.Collections.shuffle(startEvents, new Random(id));
             startEvents
                     .iterator()
-                    .forEachRemaining(r -> ctx.collectWithTimestamp(r, 
r.getEventTime()));
+                    .forEachRemaining(r -> ctx.collectWithTimestamp(r, 
r.getEventTimeMillis()));

Review comment:
       Not for this PR, but didn't you want to remove `collectWithTimestamp` 
calls?
   
   If I see this correctly, only the LongRides exercises and solutions need 
timestamps and they have appropriate watermark strategies...

##########
File path: 
common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
##########
@@ -139,7 +132,7 @@ public int compareTo(@Nullable TaxiRide other) {
         if (other == null) {
             return 1;
         }
-        int compareTimes = Long.compare(this.getEventTime(), 
other.getEventTime());
+        int compareTimes = Long.compare(this.getEventTimeMillis(), 
other.getEventTimeMillis());

Review comment:
       Why not use `java.time.Instant#compareTo`?
   ```suggestion
           int compareTimes = this.eventTime.compareTo(other.eventTime);
   ```

##########
File path: 
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context 
context, Collector<Long> out)
                 throws Exception {
 
             TaxiRide firstRideEvent = rideState.value();
-
             if (firstRideEvent == null) {
+                // whatever event comes first, remember it
                 rideState.update(ride);
 
                 if (ride.isStart) {
+                    // we will use this timer to check for rides that have 
gone on too long and may
+                    // not yet have an END event (or the END event could be 
missing)
                     
context.timerService().registerEventTimeTimer(getTimerTime(ride));
-                } else {
-                    if (rideTooLong(ride)) {
-                        out.collect(ride.rideId);
-                    }
                 }
             } else {
                 if (ride.isStart) {
-                    // There's nothing to do but clear the state (which is 
done below).
+                    if (rideTooLong(ride, firstRideEvent)) {
+                        out.collect(ride.rideId);
+                    }
                 } else {
-                    // There may be a timer that hasn't fired yet.
+                    // there is probably a timer that hasn't fired yet

Review comment:
       There **is** a timer if `firstRideEvent` was a START event...

##########
File path: 
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context 
context, Collector<Long> out)
                 throws Exception {
 
             TaxiRide firstRideEvent = rideState.value();
-
             if (firstRideEvent == null) {
+                // whatever event comes first, remember it
                 rideState.update(ride);
 
                 if (ride.isStart) {
+                    // we will use this timer to check for rides that have 
gone on too long and may
+                    // not yet have an END event (or the END event could be 
missing)
                     
context.timerService().registerEventTimeTimer(getTimerTime(ride));
-                } else {
-                    if (rideTooLong(ride)) {
-                        out.collect(ride.rideId);
-                    }
                 }
             } else {
                 if (ride.isStart) {
-                    // There's nothing to do but clear the state (which is 
done below).
+                    if (rideTooLong(ride, firstRideEvent)) {
+                        out.collect(ride.rideId);
+                    }
                 } else {
-                    // There may be a timer that hasn't fired yet.
+                    // there is probably a timer that hasn't fired yet
                     
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
 
-                    // It could be that the ride has gone on too long, but the 
timer hasn't fired
-                    // yet.
-                    if (rideTooLong(ride)) {
+                    // it could be that the ride has gone on too long (and the 
timer didn't fire)

Review comment:
       You keep removing "yet" here...isn't this making clear that the timer 
would have fired if we hadn't removed it above?
   ```suggestion
                       // it could be that the ride has gone on too long (and 
the timer didn't fire yet)
   ```

##########
File path: 
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context 
context, Collector<Long> out)
                 throws Exception {
 
             TaxiRide firstRideEvent = rideState.value();
-
             if (firstRideEvent == null) {
+                // whatever event comes first, remember it
                 rideState.update(ride);
 
                 if (ride.isStart) {
+                    // we will use this timer to check for rides that have 
gone on too long and may
+                    // not yet have an END event (or the END event could be 
missing)
                     
context.timerService().registerEventTimeTimer(getTimerTime(ride));
-                } else {
-                    if (rideTooLong(ride)) {
-                        out.collect(ride.rideId);
-                    }
                 }

Review comment:
       Can you add a section to the discussion on how to deal with missing 
START events? We could use event-time timers for this as well, e.g. by firing 
for the END event's timestamp

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -104,6 +132,15 @@ public void shouldAlertOnWatermark() throws Exception {
         assertThat(harness.numEventTimeTimers()).isZero();
     }
 
+    private Long resultingRideId() {
+        ConcurrentLinkedQueue<Object> results = harness.getOutput();
+        assertThat(results.size())
+                .isEqualTo(1)
+                .withFailMessage("Expecting test to have exactly one result");
+        StreamRecord<Long> resultingRecord = (StreamRecord<Long>) 
results.toArray()[0];

Review comment:
       Converting to array is unnecessary:
   ```suggestion
           StreamRecord<Long> resultingRecord = (StreamRecord<Long>) 
results.element();
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -104,6 +132,15 @@ public void shouldAlertOnWatermark() throws Exception {
         assertThat(harness.numEventTimeTimers()).isZero();
     }
 
+    private Long resultingRideId() {
+        ConcurrentLinkedQueue<Object> results = harness.getOutput();
+        assertThat(results.size())
+                .isEqualTo(1)
+                .withFailMessage("Expecting test to have exactly one result");

Review comment:
       From the Javadoc:
   > You must set it before calling the assertion otherwise it is ignored as 
the failing assertion breaks the chained call by throwing an AssertionError.
   
   ```suggestion
           assertThat(results.size())
                   .withFailMessage("Expecting test to have exactly one result")
                   .isEqualTo(1);
   ```

##########
File path: 
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -91,32 +91,38 @@ object LongRidesSolution {
         new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
     }
 
+    @throws[Exception]
     override def processElement(ride: TaxiRide,
                                 context: KeyedProcessFunction[Long, TaxiRide, 
Long]#Context,
                                 out: Collector[Long]): Unit = {
 
-      val firstRideEvent = rideState.value()
+      val firstRideEvent: TaxiRide = rideState.value
 
       if (firstRideEvent == null) {
+        // whatever event comes first, remember it
         rideState.update(ride)
+
         if (ride.isStart) {
+          // we will use this timer to check for rides that have gone on too 
long and may
+          // not yet have an END event (or the END event could be missing)
           context.timerService.registerEventTimeTimer(getTimerTime(ride))
-        } else if (rideTooLong(ride)) {
-          out.collect(ride.rideId)
         }
       } else {
         if (ride.isStart) {
-          // There's nothing to do but clear the state (which is done below).
+          if (rideTooLong(ride, firstRideEvent)) {
+            out.collect(ride.rideId)
+          }
         } else {
-          // There may be a timer that hasn't fired yet.
+          // there is probably a timer that hasn't fired yet
           
context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))
 
-          // It could be that the ride has gone on too long, but the timer 
hasn't fired yet.
-          if (rideTooLong(ride)) {
+          // it could be that the ride has gone on too long (and the timer 
didn't fire)

Review comment:
       ```suggestion
             // it could be that the ride has gone on too long (and the timer 
didn't fire yet)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to