NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r692839241
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java
##########
@@ -0,0 +1,26 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestSink<OUT> implements SinkFunction<OUT> {
+
+ // must be static
+ public static final List VALUES = Collections.synchronizedList(new
ArrayList<>());
+
+ @Override
+ public void invoke(OUT value, Context context) {
+ VALUES.add(value);
+ }
+
+ public Iterable<OUT> results() {
+ return VALUES;
+ }
+
+ public void reset() {
+ VALUES.clear();
+ }
+}
Review comment:
In the Flink sources, I also found this pattern for test sinks:
```
stream.addSink(
new RichSinkFunction<Integer>() {
@Override
public void open(Configuration parameters) throws
Exception {
getRuntimeContext()
.addAccumulator("result", new
ListAccumulator<Integer>());
}
@Override
public void invoke(Integer value, Context context)
throws Exception {
getRuntimeContext().getAccumulator("result").add(value);
}
});
List<Integer> result = env.execute().getAccumulatorResult("result");
```
-> maybe that's a cleaner approach to fetching results?
Another, probably even better, option: using
`org.apache.flink.streaming.util.StreamCollector#collect`. In that case, we
wouldn't even need our own `SinkFunction`.
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+ protected T[] testStream;
+ TestSourcePartitioner<T> partitioner;
+ private List<T> substream;
+
+ public ParallelTestSource(TestSourcePartitioner<T> partitioner, T...
events) {
+ this.partitioner = partitioner;
+ this.testStream = events;
+ }
+
+ public ParallelTestSource(T... events) {
+ this.partitioner = (e -> 0);
+ this.testStream = events;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ int numberOfParallelSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
+ substream = new ArrayList<>();
+
+ for (int i = 0; i < testStream.length; i++) {
+ T element = testStream[i];
+ long subtaskToUse = partitioner.partition(element) %
numberOfParallelSubtasks;
+
+ if (subtaskToUse == indexOfThisSubtask) {
+ substream.add(element);
+ } else if (subtaskToUse < 0 || subtaskToUse >
numberOfParallelSubtasks - 1) {
+ throw new RuntimeException("Requested subtask is
out-of-bounds: " + subtaskToUse);
+ }
+ }
Review comment:
Did you create this `substream` separately for debugging purposes to see
which elements are actually contained in a source? Otherwise, we can skip this
variable and integrate the logic here into `run()` (but maybe, it's relevant
for the source interface change)
##########
File path:
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
- * <p>The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
*/
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+ private SourceFunction<TaxiRide> source;
Review comment:
This design with an instance of a `SourceFunction` probably needs to be
changed when using the new source interface and putting the timestamp assigner
into the source.
Should we take that into account now?
##########
File path:
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -71,36 +109,53 @@ public void open(Configuration config) {
}
@Override
- public void processElement(TaxiRide ride, Context context,
Collector<TaxiRide> out)
+ public void processElement(TaxiRide ride, Context context,
Collector<Long> out)
throws Exception {
- TaxiRide previousRideEvent = rideState.value();
- if (previousRideEvent == null) {
+ TaxiRide firstRideEvent = rideState.value();
+
+ if (firstRideEvent == null) {
rideState.update(ride);
+
if (ride.isStart) {
context.timerService().registerEventTimeTimer(getTimerTime(ride));
+ } else {
+ if (rideTooLong(ride)) {
+ out.collect(ride.rideId);
+ }
}
} else {
- if (!ride.isStart) {
- // it's an END event, so event saved was the START event
and has a timer
- // the timer hasn't fired yet, and we can safely kill the
timer
-
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
+ if (ride.isStart) {
+ // There's nothing to do but clear the state (which is
done below).
+ } else {
+ // There may be a timer that hasn't fired yet.
+
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
+
+ // It could be that the ride has gone on too long, but the
timer hasn't fired.
Review comment:
```suggestion
// It could be that the ride has gone on too long, but
the timer hasn't fired yet.
```
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long markThreeHoursLater = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ TaxiRide endedThreeHoursLater = endRide(rideStarted,
THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedThreeHoursLater);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(rideStarted,
markThreeHoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void longRide() throws Exception {
+ public void endIsMissing() throws Exception {
Review comment:
Unit test?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long markThreeHoursLater = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ TaxiRide endedThreeHoursLater = endRide(rideStarted,
THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedThreeHoursLater);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(rideStarted,
markThreeHoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void longRide() throws Exception {
+ public void endIsMissing() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
- TaxiRide rideEnded3HoursLater = endRide(rideStarted,
BEGINNING.plusSeconds(180 * 60));
- TestRideSource source =
- new TestRideSource(rideStarted, mark2HoursLater,
rideEnded3HoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void startIsDelayedMoreThanTwoHours() throws Exception {
- TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide rideEndedAfter1Hour = endRide(rideStarted,
BEGINNING.plusSeconds(60 * 60));
- Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ public void endComesAfter3Hours() throws Exception {
+ TaxiRide startOfLongRide = startRide(1, BEGINNING);
+ TaxiRide longRideEndedAfter3Hours = endRide(startOfLongRide,
THREE_HOURS_LATER);
- TestRideSource source =
- new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd,
rideStarted);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(
+ new PartitionByRideId(), startOfLongRide,
longRideEndedAfter3Hours);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(startOfLongRide.rideId);
+ }
+
+ @Test
+ public void multipleRides() throws Exception {
+ TaxiRide startOfOneRide = startRide(1, BEGINNING);
+ TaxiRide otherRide = startRide(2, ONE_MINUTE_LATER);
+ TaxiRide oneRideEnded = endRide(startOfOneRide, THREE_HOURS_LATER);
+ TaxiRide otherRideEnded = endRide(otherRide, THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(
+ new PartitionByRideId(),
+ startOfOneRide,
+ otherRide,
+ oneRideEnded,
+ otherRideEnded);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results())
+ .containsExactlyInAnyOrder(startOfOneRide.rideId,
otherRide.rideId);
}
- private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime,
Instant endTime) {
+ // Arranges for all events for a given rideId to be generated by the same
source subtask.
+ private static class PartitionByRideId implements
TestSourcePartitioner<TaxiRide> {
+ @Override
+ public long partition(TaxiRide ride) {
+ return ride.rideId;
+ }
+ }
Review comment:
Why is this important? Wouldn't it be better that things are arbitrarily
distributed between source "partitions"?
##########
File path: long-ride-alerts/README.md
##########
@@ -62,41 +61,29 @@ The resulting stream should be printed to standard out.
<details>
<summary><strong>Overall approach</strong></summary>
-This exercise revolves around using a `ProcessFunction` to manage some keyed
state and event time timers,
-and doing so in a way that works even when the END event for a given `rideId`
arrives before the START (which can happen).
-The challenge is figuring out what state to keep, and when to set and clear
that state.
-You will want to use event time timers that fire two hours after an incoming
START event, and in the `onTimer()` method,
-collect START events to the output only if a matching END event hasn't yet
arrived.
-</details>
-
-<details>
-<summary><strong>State and timers</strong></summary>
-
-There are many possible solutions for this exercise, but in general it is
enough to keep one
-`TaxiRide` in state (one `TaxiRide` for each key, or `rideId`). The approach
used in the reference solution is to
-store whichever event arrives first (the START or the END), and if it's a
START event,
-create a timer for two hours later. If and when the other event (for the same
`rideId`) arrives,
-carefully clean things up.
-
-It is possible to arrange this so that if `onTimer()` is called, you are
guaranteed that
-an alert (i.e., the ride kept in state) should be emitted. Writing the code
this way conveniently
-puts all of the complex business logic together in one place (in the
`processElement()` method).
+This exercise revolves around using a `KeyedProcessFunction` to manage some
state and event time timers,
+and doing so in a way that works even when the END event for a given `rideId`
arrives before the START.
+The challenge is figuring out what state and timers to use, and when to set
and clear the state (and timers).
+It is not enough to simply wait for the END event and calculate the duration,
as the END event
+may be missing.
</details>
## Documentation
-
[ProcessFunction](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html)
- [Working with
State](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/index.html)
+## After you've completed the exercise
+
+Read the [discussion of the reference solutions](DISCUSSION.md).
+
## Reference Solutions
-Reference solutions are available at GitHub:
+Reference solutions:
- Java API:
[`org.apache.flink.training.solutions.longrides.LongRidesSolution`](src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
- Scala API:
[`org.apache.flink.training.solutions.longrides.scala.LongRidesSolution`](src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala)
-----
-[**Lab Discussion: `ProcessFunction` and Timers (Long Ride
Alerts)**](DISCUSSION.md)
-
Review comment:
Do you actually want to remove the link to the discussion page?
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+ protected T[] testStream;
+ TestSourcePartitioner<T> partitioner;
+ private List<T> substream;
+
+ public ParallelTestSource(TestSourcePartitioner<T> partitioner, T...
events) {
+ this.partitioner = partitioner;
+ this.testStream = events;
+ }
+
+ public ParallelTestSource(T... events) {
+ this.partitioner = (e -> 0);
+ this.testStream = events;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ int numberOfParallelSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
+ substream = new ArrayList<>();
+
+ for (int i = 0; i < testStream.length; i++) {
+ T element = testStream[i];
Review comment:
```suggestion
for (T element : testStream) {
```
##########
File path: common/build.gradle
##########
@@ -17,6 +17,10 @@ dependencies {
shadow
"org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}"
testApi "junit:junit:${junitVersion}"
+ testApi
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}:tests"
+ testApi
"org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}:tests"
testApi "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
+ testApi
"org.apache.flink:flink-test-utils_${scalaBinaryVersion}:${flinkVersion}"
testApi 'org.hamcrest:hamcrest-library:1.3'
+ testApi 'org.assertj:assertj-core:3.20.2'
Review comment:
Do wee need this dependency? Isn't `org.junit.Assert#assertThat()` doing
the same?
##########
File path:
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -18,81 +18,124 @@
package org.apache.flink.training.solutions.longrides.scala
-import scala.concurrent.duration._
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.training.exercises.common.datatypes.TaxiRide
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
import org.apache.flink.util.Collector
+import scala.concurrent.duration._
+import java.time.Duration
+
/**
- * Scala reference implementation for the "Long Ride Alerts" exercise of the
Flink training in the docs.
+ * Scala solution for the "Long Ride Alerts" exercise.
*
- * The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
+ * <p>You should eventually clear any state you create.
*/
object LongRidesSolution {
- def main(args: Array[String]) {
-
- // set up the execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- // operate in Event-time
- env.setParallelism(ExerciseBase.parallelism)
-
- val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ class LongRidesJob(source: SourceFunction[TaxiRide], sink:
SinkFunction[Long]) {
+
+ /**
+ * Creates and executes the ride cleansing pipeline.
+ */
+ @throws[Exception]
+ def execute(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // start the data generator
+ val rides = env.addSource(source)
+
+ // the WatermarkStrategy specifies how to extract timestamps and
generate watermarks
+ val watermarkStrategy = WatermarkStrategy
+ .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+ .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+ override def extractTimestamp(element: TaxiRide, recordTimestamp:
Long): Long =
+ element.getEventTime
+ })
+
+ // create the pipeline
+ rides
+ .assignTimestampsAndWatermarks(watermarkStrategy)
+ .keyBy(_.rideId)
+ .process(new MatchFunction())
+ .addSink(sink)
+
+ // execute the pipeline
+ env.execute("Long Taxi Rides")
+ }
- val longRides = rides
- .keyBy(_.rideId)
- .process(new MatchFunction())
+ }
- printOrTest(longRides)
+ @throws[Exception]
+ def main (args: Array[String]) {
+ val job = new LongRidesJob(new TaxiRideGenerator, new PrintSinkFunction)
- env.execute("Long Taxi Rides")
+ job.execute
}
- class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, TaxiRide] {
- lazy val rideState: ValueState[TaxiRide] = getRuntimeContext.getState(
- new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
+ class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, Long] {
+ private var rideState: ValueState[TaxiRide] = _
+
+ override def open(parameters: Configuration): Unit = {
+ rideState = getRuntimeContext.getState(
+ new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
+ }
override def processElement(ride: TaxiRide,
- context: KeyedProcessFunction[Long, TaxiRide,
TaxiRide]#Context,
- out: Collector[TaxiRide]): Unit = {
+ context: KeyedProcessFunction[Long, TaxiRide,
Long]#Context,
+ out: Collector[Long]): Unit = {
- val previousRideEvent = rideState.value()
+ val firstRideEvent = rideState.value()
- if (previousRideEvent == null) {
+ if (firstRideEvent == null) {
rideState.update(ride)
if (ride.isStart) {
- context.timerService().registerEventTimeTimer(getTimerTime(ride))
+ context.timerService.registerEventTimeTimer(getTimerTime(ride))
+ }
+ else if (rideTooLong(ride)) {
+ out.collect(ride.rideId)
}
- } else {
- if (!ride.isStart) {
- // it's an END event, so event saved was the START event and has a
timer
- // the timer hasn't fired yet, and we can safely kill the timer
-
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent))
+ }
+ else {
Review comment:
note sure style-wise, but before this was collapsed into a single line
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+ @Test
+ public void testLongRideAlertsAsSoonAsPossible() throws Exception {
Review comment:
I think, we'd need a couple of more tests to verify the full behaviour,
including:
- alerting directly if you see the end event first
- not alerting for a start event until the watermark is exactly 2h ahead
(even if you see later data or processing time advances beyond 2h)
- not alerting for an end event that is not too long
- alerting if there is no event but the watermark surpasses the 2h
- state cleanup (in the good case - not the leak we leave for the open
discussion)
Should we also verify that things, i.e. the start events, are kept in Flink
state after all and not some other local Java store (best-effort by just
looking at the number of state entries > 0)?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+ @Test
+ public void testLongRideAlertsAsSoonAsPossible() throws Exception {
+ KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> harness =
setupHarness();
+
+ TaxiRide startOfLongRide = LongRidesTest.startRide(1,
LongRidesTest.BEGINNING);
+ harness.processElement(new StreamRecord<>(startOfLongRide,
startOfLongRide.getEventTime()));
+
+ Watermark mark2HoursLater =
+ new Watermark(LongRidesTest.BEGINNING.plusSeconds(120 *
60).toEpochMilli());
+ harness.processWatermark(mark2HoursLater);
+
+ // Check that the result is correct
+ ConcurrentLinkedQueue<Object> actualOutput = harness.getOutput();
+ StreamRecord<Long> rideIdAtTimeOfWatermark =
+ new StreamRecord<>(startOfLongRide.rideId,
mark2HoursLater.getTimestamp());
+ assertThat(actualOutput).containsExactly(rideIdAtTimeOfWatermark,
mark2HoursLater);
+
+ // Check that no state or timers are left behind
+ assertThat(harness.numKeyedStateEntries()).isZero();
+ assertThat(harness.numEventTimeTimers()).isZero();
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long>
setupHarness()
+ throws Exception {
+
+ KeyedProcessOperator<Long, TaxiRide, Long> operator =
+ new KeyedProcessOperator<>(new
LongRidesSolution.MatchFunction());
+
+ KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long>
testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ operator, (TaxiRide r) -> r.rideId,
BasicTypeInfo.LONG_TYPE_INFO);
Review comment:
```suggestion
operator, (TaxiRide r) -> r.rideId, Types.LONG);
```
##########
File path:
long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
##########
@@ -18,55 +18,83 @@
package org.apache.flink.training.exercises.longrides.scala
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
Review comment:
Most of the comments for the Java exercise apply 1:1...I'll omit listing
them here again
##########
File path:
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
- * <p>The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
*/
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+ private SourceFunction<TaxiRide> source;
+ private SinkFunction<Long> sink;
+
+ /** Creates a job using the source and sink provided. */
+ public LongRidesExercise(SourceFunction<TaxiRide> source,
SinkFunction<Long> sink) {
+ this.source = source;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the long rides pipeline.
*
- * @throws Exception which occurs during job execution.
+ * <p>@throws Exception which occurs during job execution.
*/
- public static void main(String[] args) throws Exception {
+ public void execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
// start the data generator
- DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new
TaxiRideGenerator()));
+ DataStream<TaxiRide> rides = env.addSource(source,
TypeInformation.of(TaxiRide.class));
Review comment:
Not sure why you added this here, but it doesn't seem like we need to
provide the type information manually
```suggestion
DataStream<TaxiRide> rides = env.addSource(source);
```
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+ protected T[] testStream;
+ TestSourcePartitioner<T> partitioner;
+ private List<T> substream;
+
+ public ParallelTestSource(TestSourcePartitioner<T> partitioner, T...
events) {
+ this.partitioner = partitioner;
+ this.testStream = events;
+ }
+
+ public ParallelTestSource(T... events) {
+ this.partitioner = (e -> 0);
+ this.testStream = events;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ int numberOfParallelSubtasks =
getRuntimeContext().getNumberOfParallelSubtasks();
+ substream = new ArrayList<>();
+
+ for (int i = 0; i < testStream.length; i++) {
+ T element = testStream[i];
+ long subtaskToUse = partitioner.partition(element) %
numberOfParallelSubtasks;
+
+ if (subtaskToUse == indexOfThisSubtask) {
+ substream.add(element);
+ } else if (subtaskToUse < 0 || subtaskToUse >
numberOfParallelSubtasks - 1) {
Review comment:
I think, `subtaskToUse < 0` is the only thing you need to check here.
```suggestion
} else if (subtaskToUse < 0) {
```
##########
File path:
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
- * <p>The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
*/
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+ private SourceFunction<TaxiRide> source;
+ private SinkFunction<Long> sink;
+
+ /** Creates a job using the source and sink provided. */
+ public LongRidesExercise(SourceFunction<TaxiRide> source,
SinkFunction<Long> sink) {
+ this.source = source;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the long rides pipeline.
*
- * @throws Exception which occurs during job execution.
+ * <p>@throws Exception which occurs during job execution.
*/
- public static void main(String[] args) throws Exception {
+ public void execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
Review comment:
Although for a production job, we shouldn't set any configuration
parameters via code, I'd argue that we may need a special environment for these
exercises with well-defined settings. High default parallelism (number of CPUs)
in the IDE could, for example, overload the Flink starter with just too much
happening in the logs while too low parallelism may hide certain behaviour.
Maybe something along the lines of
https://github.com/ververica/flink-training/blob/d07b181f1fa89137da45a8ffa67ec4cbfaf90cf1/troubleshooting/common/src/main/java/com/ververica/flink/training/common/EnvironmentUtils.java#L44
which enables the web UI on a well-defined port.
Maybe we could also hide this under a nice builder pattern...
Either way, that may be a follow-up PR but not sure we should change the
parallelism here or rather put this into its own (bigger-scoped) improvements...
##########
File path:
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
- * <p>The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
*/
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+ private SourceFunction<TaxiRide> source;
+ private SinkFunction<Long> sink;
+
+ /** Creates a job using the source and sink provided. */
+ public LongRidesExercise(SourceFunction<TaxiRide> source,
SinkFunction<Long> sink) {
+ this.source = source;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the long rides pipeline.
*
- * @throws Exception which occurs during job execution.
+ * <p>@throws Exception which occurs during job execution.
*/
- public static void main(String[] args) throws Exception {
+ public void execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
// start the data generator
- DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new
TaxiRideGenerator()));
+ DataStream<TaxiRide> rides = env.addSource(source,
TypeInformation.of(TaxiRide.class));
- DataStream<TaxiRide> longRides =
- rides.keyBy((TaxiRide ride) -> ride.rideId).process(new
MatchFunction());
+ // the WatermarkStrategy specifies how to extract timestamps and
generate watermarks
+ WatermarkStrategy<TaxiRide> watermarkStrategy =
+
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
+ .withTimestampAssigner((ride, timestamp) ->
ride.getEventTime());
Review comment:
The second parameter here is:
> The current internal timestamp of the element, or a negative value, if no
timestamp has been assigned yet.
I'm not sure (didactically) how to name that one not to be confusing, but
`timestamp` sounds misleading ... wdyt?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+ @Test
+ public void testLongRideAlertsAsSoonAsPossible() throws Exception {
+ KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> harness =
setupHarness();
+
+ TaxiRide startOfLongRide = LongRidesTest.startRide(1,
LongRidesTest.BEGINNING);
+ harness.processElement(new StreamRecord<>(startOfLongRide,
startOfLongRide.getEventTime()));
Review comment:
Do you want to check that there is no output yet after processing this?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
Review comment:
Unit test?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long markThreeHoursLater = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ TaxiRide endedThreeHoursLater = endRide(rideStarted,
THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedThreeHoursLater);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(rideStarted,
markThreeHoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void longRide() throws Exception {
+ public void endIsMissing() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
- TaxiRide rideEnded3HoursLater = endRide(rideStarted,
BEGINNING.plusSeconds(180 * 60));
- TestRideSource source =
- new TestRideSource(rideStarted, mark2HoursLater,
rideEnded3HoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void startIsDelayedMoreThanTwoHours() throws Exception {
- TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide rideEndedAfter1Hour = endRide(rideStarted,
BEGINNING.plusSeconds(60 * 60));
- Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ public void endComesAfter3Hours() throws Exception {
+ TaxiRide startOfLongRide = startRide(1, BEGINNING);
+ TaxiRide longRideEndedAfter3Hours = endRide(startOfLongRide,
THREE_HOURS_LATER);
- TestRideSource source =
- new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd,
rideStarted);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(
+ new PartitionByRideId(), startOfLongRide,
longRideEndedAfter3Hours);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(startOfLongRide.rideId);
+ }
+
+ @Test
+ public void multipleRides() throws Exception {
+ TaxiRide startOfOneRide = startRide(1, BEGINNING);
+ TaxiRide otherRide = startRide(2, ONE_MINUTE_LATER);
+ TaxiRide oneRideEnded = endRide(startOfOneRide, THREE_HOURS_LATER);
+ TaxiRide otherRideEnded = endRide(otherRide, THREE_HOURS_LATER);
Review comment:
Do you want to add a couple of more rides (just 2h, just below 2h,
without end,...) to mix things up a bit (more of an ITCase)
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long markThreeHoursLater = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ TaxiRide endedThreeHoursLater = endRide(rideStarted,
THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedThreeHoursLater);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(rideStarted,
markThreeHoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void longRide() throws Exception {
+ public void endIsMissing() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
- TaxiRide rideEnded3HoursLater = endRide(rideStarted,
BEGINNING.plusSeconds(180 * 60));
- TestRideSource source =
- new TestRideSource(rideStarted, mark2HoursLater,
rideEnded3HoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void startIsDelayedMoreThanTwoHours() throws Exception {
- TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide rideEndedAfter1Hour = endRide(rideStarted,
BEGINNING.plusSeconds(60 * 60));
- Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ public void endComesAfter3Hours() throws Exception {
Review comment:
Unit test?
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
Review comment:
Is this test more of a UnitTest?
---
In general, the line may be difficult to draw, but how about this:
- an integration test focuses on the whole pipeline (so here: watermarks +
keyBy + MatchFunction) and any interplay between these
- a unit test focuses on the logic of the MatchFunction alone
For this, we could actually also remove the watermarking code from the
exercise and add this to the work to be done (unless you think, it may be too
much to ask for a first exercise)
##########
File path:
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
+import java.time.Duration;
+
/**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there
are no duplicates.
*
- * <p>The goal for this exercise is to emit START events for taxi rides that
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
*/
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+ private SourceFunction<TaxiRide> source;
+ private SinkFunction<Long> sink;
+
+ /** Creates a job using the source and sink provided. */
+ public LongRidesExercise(SourceFunction<TaxiRide> source,
SinkFunction<Long> sink) {
+ this.source = source;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the long rides pipeline.
*
- * @throws Exception which occurs during job execution.
+ * <p>@throws Exception which occurs during job execution.
*/
- public static void main(String[] args) throws Exception {
+ public void execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
// start the data generator
- DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new
TaxiRideGenerator()));
+ DataStream<TaxiRide> rides = env.addSource(source,
TypeInformation.of(TaxiRide.class));
- DataStream<TaxiRide> longRides =
- rides.keyBy((TaxiRide ride) -> ride.rideId).process(new
MatchFunction());
+ // the WatermarkStrategy specifies how to extract timestamps and
generate watermarks
+ WatermarkStrategy<TaxiRide> watermarkStrategy =
+
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
+ .withTimestampAssigner((ride, timestamp) ->
ride.getEventTime());
- printOrTest(longRides);
+ // create the pipeline
+ rides.assignTimestampsAndWatermarks(watermarkStrategy)
+ .keyBy((TaxiRide ride) -> ride.rideId)
Review comment:
Simpler? Or did you want to make the type explicit?
```suggestion
.keyBy(ride -> ride.rideId)
```
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
##########
@@ -0,0 +1,47 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+
+/**
+ * This allows the tests to be run against both the exercises and the
solutions.
+ *
+ * <p>If an exercise throws MissingSolutionException, then the solution is
tested.
+ */
+public class ComposedPipeline<IN, OUT> implements ExecutablePipeline<IN, OUT> {
+
+ private ExecutablePipeline<IN, OUT> exercise;
+ private ExecutablePipeline<IN, OUT> solution;
+
+ public ComposedPipeline(
+ ExecutablePipeline<IN, OUT> exercise, ExecutablePipeline<IN, OUT>
solution) {
+ this.exercise = exercise;
+ this.solution = solution;
+ }
+
+ @Override
+ public void execute(SourceFunction<IN> source, TestSink<OUT> sink) throws
Exception {
+
+ sink.reset();
+
+ try {
+ exercise.execute(source, sink);
+ } catch (Exception e) {
+ if (ultimateCauseIsMissingSolution(e)) {
+ solution.execute(source, sink);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private boolean ultimateCauseIsMissingSolution(Throwable e) {
+ if (e instanceof MissingSolutionException) {
+ return true;
+ } else if (e.getCause() != null) {
+ return ultimateCauseIsMissingSolution(e.getCause());
+ } else {
+ return false;
+ }
Review comment:
I'm not sure how deep these stack traces could be, but wouldn't it be
better to iterate through these instead?
```suggestion
while (e != null) {
if (e instanceof MissingSolutionException) {
return true;
} else {
e = e.getCause();
}
}
return false;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]