NicoK commented on a change in pull request #36:
URL: https://github.com/apache/flink-training/pull/36#discussion_r700013921
##########
File path:
common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
##########
@@ -71,7 +71,7 @@ public void run(SourceContext<TaxiRide> ctx) throws Exception
{
java.util.Collections.shuffle(startEvents, new Random(id));
startEvents
.iterator()
- .forEachRemaining(r -> ctx.collectWithTimestamp(r,
r.getEventTime()));
+ .forEachRemaining(r -> ctx.collectWithTimestamp(r,
r.getEventTimeMillis()));
Review comment:
Not for this PR, but didn't you want to remove `collectWithTimestamp`
calls?
If I see this correctly, only the LongRides exercises and solutions need
timestamps and they have appropriate watermark strategies...
##########
File path:
common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
##########
@@ -139,7 +132,7 @@ public int compareTo(@Nullable TaxiRide other) {
if (other == null) {
return 1;
}
- int compareTimes = Long.compare(this.getEventTime(),
other.getEventTime());
+ int compareTimes = Long.compare(this.getEventTimeMillis(),
other.getEventTimeMillis());
Review comment:
Why not use `java.time.Instant#compareTo`?
```suggestion
int compareTimes = this.eventTime.compareTo(other.eventTime);
```
##########
File path:
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context
context, Collector<Long> out)
throws Exception {
TaxiRide firstRideEvent = rideState.value();
-
if (firstRideEvent == null) {
+ // whatever event comes first, remember it
rideState.update(ride);
if (ride.isStart) {
+ // we will use this timer to check for rides that have
gone on too long and may
+ // not yet have an END event (or the END event could be
missing)
context.timerService().registerEventTimeTimer(getTimerTime(ride));
- } else {
- if (rideTooLong(ride)) {
- out.collect(ride.rideId);
- }
}
} else {
if (ride.isStart) {
- // There's nothing to do but clear the state (which is
done below).
+ if (rideTooLong(ride, firstRideEvent)) {
+ out.collect(ride.rideId);
+ }
} else {
- // There may be a timer that hasn't fired yet.
+ // there is probably a timer that hasn't fired yet
Review comment:
There **is** a timer if `firstRideEvent` was a START event...
##########
File path:
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context
context, Collector<Long> out)
throws Exception {
TaxiRide firstRideEvent = rideState.value();
-
if (firstRideEvent == null) {
+ // whatever event comes first, remember it
rideState.update(ride);
if (ride.isStart) {
+ // we will use this timer to check for rides that have
gone on too long and may
+ // not yet have an END event (or the END event could be
missing)
context.timerService().registerEventTimeTimer(getTimerTime(ride));
- } else {
- if (rideTooLong(ride)) {
- out.collect(ride.rideId);
- }
}
} else {
if (ride.isStart) {
- // There's nothing to do but clear the state (which is
done below).
+ if (rideTooLong(ride, firstRideEvent)) {
+ out.collect(ride.rideId);
+ }
} else {
- // There may be a timer that hasn't fired yet.
+ // there is probably a timer that hasn't fired yet
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
- // It could be that the ride has gone on too long, but the
timer hasn't fired
- // yet.
- if (rideTooLong(ride)) {
+ // it could be that the ride has gone on too long (and the
timer didn't fire)
Review comment:
You keep removing "yet" here...isn't this making clear that the timer
would have fired if we hadn't removed it above?
```suggestion
// it could be that the ride has gone on too long (and
the timer didn't fire yet)
```
##########
File path:
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -115,31 +115,31 @@ public void processElement(TaxiRide ride, Context
context, Collector<Long> out)
throws Exception {
TaxiRide firstRideEvent = rideState.value();
-
if (firstRideEvent == null) {
+ // whatever event comes first, remember it
rideState.update(ride);
if (ride.isStart) {
+ // we will use this timer to check for rides that have
gone on too long and may
+ // not yet have an END event (or the END event could be
missing)
context.timerService().registerEventTimeTimer(getTimerTime(ride));
- } else {
- if (rideTooLong(ride)) {
- out.collect(ride.rideId);
- }
}
Review comment:
Can you add a section to the discussion on how to deal with missing
START events? We could use event-time timers for this as well, e.g. by firing
for the END event's timestamp
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -104,6 +132,15 @@ public void shouldAlertOnWatermark() throws Exception {
assertThat(harness.numEventTimeTimers()).isZero();
}
+ private Long resultingRideId() {
+ ConcurrentLinkedQueue<Object> results = harness.getOutput();
+ assertThat(results.size())
+ .isEqualTo(1)
+ .withFailMessage("Expecting test to have exactly one result");
+ StreamRecord<Long> resultingRecord = (StreamRecord<Long>)
results.toArray()[0];
Review comment:
Converting to array is unnecessary:
```suggestion
StreamRecord<Long> resultingRecord = (StreamRecord<Long>)
results.element();
```
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -104,6 +132,15 @@ public void shouldAlertOnWatermark() throws Exception {
assertThat(harness.numEventTimeTimers()).isZero();
}
+ private Long resultingRideId() {
+ ConcurrentLinkedQueue<Object> results = harness.getOutput();
+ assertThat(results.size())
+ .isEqualTo(1)
+ .withFailMessage("Expecting test to have exactly one result");
Review comment:
From the Javadoc:
> You must set it before calling the assertion otherwise it is ignored as
the failing assertion breaks the chained call by throwing an AssertionError.
```suggestion
assertThat(results.size())
.withFailMessage("Expecting test to have exactly one result")
.isEqualTo(1);
```
##########
File path:
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -91,32 +91,38 @@ object LongRidesSolution {
new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
}
+ @throws[Exception]
override def processElement(ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide,
Long]#Context,
out: Collector[Long]): Unit = {
- val firstRideEvent = rideState.value()
+ val firstRideEvent: TaxiRide = rideState.value
if (firstRideEvent == null) {
+ // whatever event comes first, remember it
rideState.update(ride)
+
if (ride.isStart) {
+ // we will use this timer to check for rides that have gone on too
long and may
+ // not yet have an END event (or the END event could be missing)
context.timerService.registerEventTimeTimer(getTimerTime(ride))
- } else if (rideTooLong(ride)) {
- out.collect(ride.rideId)
}
} else {
if (ride.isStart) {
- // There's nothing to do but clear the state (which is done below).
+ if (rideTooLong(ride, firstRideEvent)) {
+ out.collect(ride.rideId)
+ }
} else {
- // There may be a timer that hasn't fired yet.
+ // there is probably a timer that hasn't fired yet
context.timerService.deleteEventTimeTimer(getTimerTime(firstRideEvent))
- // It could be that the ride has gone on too long, but the timer
hasn't fired yet.
- if (rideTooLong(ride)) {
+ // it could be that the ride has gone on too long (and the timer
didn't fire)
Review comment:
```suggestion
// it could be that the ride has gone on too long (and the timer
didn't fire yet)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]