NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r692839241



##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/TestSink.java
##########
@@ -0,0 +1,26 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class TestSink<OUT> implements SinkFunction<OUT> {
+
+    // must be static
+    public static final List VALUES = Collections.synchronizedList(new 
ArrayList<>());
+
+    @Override
+    public void invoke(OUT value, Context context) {
+        VALUES.add(value);
+    }
+
+    public Iterable<OUT> results() {
+        return VALUES;
+    }
+
+    public void reset() {
+        VALUES.clear();
+    }
+}

Review comment:
       In the Flink sources, I also found this pattern for test sinks:
   ```
           stream.addSink(
                   new RichSinkFunction<Integer>() {
                       @Override
                       public void open(Configuration parameters) throws 
Exception {
                           getRuntimeContext()
                                   .addAccumulator("result", new 
ListAccumulator<Integer>());
                       }
   
                       @Override
                       public void invoke(Integer value, Context context) 
throws Exception {
                           
getRuntimeContext().getAccumulator("result").add(value);
                       }
                   });
           List<Integer> result = env.execute().getAccumulatorResult("result");
   ```
   -> maybe that's a cleaner approach to fetching results?
   
   Another, probably even better, option: using 
`org.apache.flink.streaming.util.StreamCollector#collect`. In that case, we 
wouldn't even need our own `SinkFunction`.

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+    protected T[] testStream;
+    TestSourcePartitioner<T> partitioner;
+    private List<T> substream;
+
+    public ParallelTestSource(TestSourcePartitioner<T> partitioner, T... 
events) {
+        this.partitioner = partitioner;
+        this.testStream = events;
+    }
+
+    public ParallelTestSource(T... events) {
+        this.partitioner = (e -> 0);
+        this.testStream = events;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+        int numberOfParallelSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+        substream = new ArrayList<>();
+
+        for (int i = 0; i < testStream.length; i++) {
+            T element = testStream[i];
+            long subtaskToUse = partitioner.partition(element) % 
numberOfParallelSubtasks;
+
+            if (subtaskToUse == indexOfThisSubtask) {
+                substream.add(element);
+            } else if (subtaskToUse < 0 || subtaskToUse > 
numberOfParallelSubtasks - 1) {
+                throw new RuntimeException("Requested subtask is 
out-of-bounds: " + subtaskToUse);
+            }
+        }

Review comment:
       Did you create this `substream` separately for debugging purposes to see 
which elements are actually contained in a source? Otherwise, we can skip this 
variable and integrate the logic here into `run()` (but maybe, it's relevant 
for the source interface change)

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;

Review comment:
       This design with an instance of a `SourceFunction` probably needs to be 
changed when using the new source interface and putting the timestamp assigner 
into the source.
   Should we take that into account now?

##########
File path: 
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -71,36 +109,53 @@ public void open(Configuration config) {
         }
 
         @Override
-        public void processElement(TaxiRide ride, Context context, 
Collector<TaxiRide> out)
+        public void processElement(TaxiRide ride, Context context, 
Collector<Long> out)
                 throws Exception {
-            TaxiRide previousRideEvent = rideState.value();
 
-            if (previousRideEvent == null) {
+            TaxiRide firstRideEvent = rideState.value();
+
+            if (firstRideEvent == null) {
                 rideState.update(ride);
+
                 if (ride.isStart) {
                     
context.timerService().registerEventTimeTimer(getTimerTime(ride));
+                } else {
+                    if (rideTooLong(ride)) {
+                        out.collect(ride.rideId);
+                    }
                 }
             } else {
-                if (!ride.isStart) {
-                    // it's an END event, so event saved was the START event 
and has a timer
-                    // the timer hasn't fired yet, and we can safely kill the 
timer
-                    
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
+                if (ride.isStart) {
+                    // There's nothing to do but clear the state (which is 
done below).
+                } else {
+                    // There may be a timer that hasn't fired yet.
+                    
context.timerService().deleteEventTimeTimer(getTimerTime(firstRideEvent));
+
+                    // It could be that the ride has gone on too long, but the 
timer hasn't fired.

Review comment:
       ```suggestion
                       // It could be that the ride has gone on too long, but 
the timer hasn't fired yet.
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
-    public void noEnd() throws Exception {
+    public void noStartLong() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedThreeHoursLater);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void longRide() throws Exception {
+    public void endIsMissing() throws Exception {

Review comment:
       Unit test?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
-    public void noEnd() throws Exception {
+    public void noStartLong() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedThreeHoursLater);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void longRide() throws Exception {
+    public void endIsMissing() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
-        TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusSeconds(180 * 60));
 
-        TestRideSource source =
-                new TestRideSource(rideStarted, mark2HoursLater, 
rideEnded3HoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void startIsDelayedMoreThanTwoHours() throws Exception {
-        TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide rideEndedAfter1Hour = endRide(rideStarted, 
BEGINNING.plusSeconds(60 * 60));
-        Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+    public void endComesAfter3Hours() throws Exception {
+        TaxiRide startOfLongRide = startRide(1, BEGINNING);
+        TaxiRide longRideEndedAfter3Hours = endRide(startOfLongRide, 
THREE_HOURS_LATER);
 
-        TestRideSource source =
-                new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, 
rideStarted);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(
+                        new PartitionByRideId(), startOfLongRide, 
longRideEndedAfter3Hours);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(startOfLongRide.rideId);
+    }
+
+    @Test
+    public void multipleRides() throws Exception {
+        TaxiRide startOfOneRide = startRide(1, BEGINNING);
+        TaxiRide otherRide = startRide(2, ONE_MINUTE_LATER);
+        TaxiRide oneRideEnded = endRide(startOfOneRide, THREE_HOURS_LATER);
+        TaxiRide otherRideEnded = endRide(otherRide, THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(
+                        new PartitionByRideId(),
+                        startOfOneRide,
+                        otherRide,
+                        oneRideEnded,
+                        otherRideEnded);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results())
+                .containsExactlyInAnyOrder(startOfOneRide.rideId, 
otherRide.rideId);
     }
 
-    private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, 
Instant endTime) {
+    // Arranges for all events for a given rideId to be generated by the same 
source subtask.
+    private static class PartitionByRideId implements 
TestSourcePartitioner<TaxiRide> {
+        @Override
+        public long partition(TaxiRide ride) {
+            return ride.rideId;
+        }
+    }

Review comment:
       Why is this important? Wouldn't it be better that things are arbitrarily 
distributed between source "partitions"?

##########
File path: long-ride-alerts/README.md
##########
@@ -62,41 +61,29 @@ The resulting stream should be printed to standard out.
 <details>
 <summary><strong>Overall approach</strong></summary>
 
-This exercise revolves around using a `ProcessFunction` to manage some keyed 
state and event time timers,
-and doing so in a way that works even when the END event for a given `rideId` 
arrives before the START (which can happen).
-The challenge is figuring out what state to keep, and when to set and clear 
that state.
-You will want to use event time timers that fire two hours after an incoming 
START event, and in the `onTimer()` method,
-collect START events to the output only if a matching END event hasn't yet 
arrived.
-</details>
-
-<details>
-<summary><strong>State and timers</strong></summary>
-
-There are many possible solutions for this exercise, but in general it is 
enough to keep one
-`TaxiRide` in state (one `TaxiRide` for each key, or `rideId`). The approach 
used in the reference solution is to
-store whichever event arrives first (the START or the END), and if it's a 
START event,
-create a timer for two hours later. If and when the other event (for the same 
`rideId`) arrives,
-carefully clean things up.
-
-It is possible to arrange this so that if `onTimer()` is called, you are 
guaranteed that
-an alert (i.e., the ride kept in state) should be emitted. Writing the code 
this way conveniently
-puts all of the complex business logic together in one place (in the 
`processElement()` method).
+This exercise revolves around using a `KeyedProcessFunction` to manage some 
state and event time timers,
+and doing so in a way that works even when the END event for a given `rideId` 
arrives before the START.
+The challenge is figuring out what state and timers to use, and when to set 
and clear the state (and timers).
+It is not enough to simply wait for the END event and calculate the duration, 
as the END event
+may be missing.
 </details>
 
 ## Documentation
 
 - 
[ProcessFunction](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html)
 - [Working with 
State](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/index.html)
 
+## After you've completed the exercise
+
+Read the [discussion of the reference solutions](DISCUSSION.md).
+
 ## Reference Solutions
 
-Reference solutions are available at GitHub:
+Reference solutions:
 
 - Java API:  
[`org.apache.flink.training.solutions.longrides.LongRidesSolution`](src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
 - Scala API: 
[`org.apache.flink.training.solutions.longrides.scala.LongRidesSolution`](src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala)
 
 -----
 
-[**Lab Discussion: `ProcessFunction` and Timers (Long Ride 
Alerts)**](DISCUSSION.md)
-

Review comment:
       Do you actually want to remove the link to the discussion page?

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+    protected T[] testStream;
+    TestSourcePartitioner<T> partitioner;
+    private List<T> substream;
+
+    public ParallelTestSource(TestSourcePartitioner<T> partitioner, T... 
events) {
+        this.partitioner = partitioner;
+        this.testStream = events;
+    }
+
+    public ParallelTestSource(T... events) {
+        this.partitioner = (e -> 0);
+        this.testStream = events;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+        int numberOfParallelSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+        substream = new ArrayList<>();
+
+        for (int i = 0; i < testStream.length; i++) {
+            T element = testStream[i];

Review comment:
       ```suggestion
           for (T element : testStream) {
   ```

##########
File path: common/build.gradle
##########
@@ -17,6 +17,10 @@ dependencies {
     shadow 
"org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}"
 
     testApi "junit:junit:${junitVersion}"
+    testApi 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}:tests"
+    testApi 
"org.apache.flink:flink-runtime_${scalaBinaryVersion}:${flinkVersion}:tests"
     testApi "org.apache.flink:flink-test-utils-junit:${flinkVersion}"
+    testApi 
"org.apache.flink:flink-test-utils_${scalaBinaryVersion}:${flinkVersion}"
     testApi 'org.hamcrest:hamcrest-library:1.3'
+    testApi 'org.assertj:assertj-core:3.20.2'

Review comment:
       Do wee need this dependency? Isn't `org.junit.Assert#assertThat()` doing 
the same?

##########
File path: 
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -18,81 +18,124 @@
 
 package org.apache.flink.training.solutions.longrides.scala
 
-import scala.concurrent.duration._
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
 
+import scala.concurrent.duration._
+import java.time.Duration
+
 /**
-  * Scala reference implementation for the "Long Ride Alerts" exercise of the 
Flink training in the docs.
+  * Scala solution for the "Long Ride Alerts" exercise.
   *
-  * The goal for this exercise is to emit START events for taxi rides that 
have not been matched
-  * by an END event during the first 2 hours of the ride.
+  * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+  * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
   *
+  * <p>You should eventually clear any state you create.
   */
 object LongRidesSolution {
 
-  def main(args: Array[String]) {
-
-    // set up the execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    // operate in Event-time
-    env.setParallelism(ExerciseBase.parallelism)
-
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+  class LongRidesJob(source: SourceFunction[TaxiRide], sink: 
SinkFunction[Long]) {
+
+    /**
+     * Creates and executes the ride cleansing pipeline.
+     */
+    @throws[Exception]
+    def execute(): Unit = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      // start the data generator
+      val rides = env.addSource(source)
+
+      // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+      val watermarkStrategy = WatermarkStrategy
+        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+          override def extractTimestamp(element: TaxiRide, recordTimestamp: 
Long): Long =
+            element.getEventTime
+        })
+
+      // create the pipeline
+      rides
+        .assignTimestampsAndWatermarks(watermarkStrategy)
+        .keyBy(_.rideId)
+        .process(new MatchFunction())
+        .addSink(sink)
+
+      // execute the pipeline
+      env.execute("Long Taxi Rides")
+    }
 
-    val longRides = rides
-      .keyBy(_.rideId)
-      .process(new MatchFunction())
+  }
 
-    printOrTest(longRides)
+  @throws[Exception]
+  def main (args: Array[String]) {
+    val job = new LongRidesJob(new TaxiRideGenerator, new PrintSinkFunction)
 
-    env.execute("Long Taxi Rides")
+    job.execute
   }
 
-  class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, TaxiRide] {
-    lazy val rideState: ValueState[TaxiRide] = getRuntimeContext.getState(
-      new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
+  class MatchFunction extends KeyedProcessFunction[Long, TaxiRide, Long] {
+    private var rideState: ValueState[TaxiRide] = _
+
+    override def open(parameters: Configuration): Unit = {
+      rideState = getRuntimeContext.getState(
+        new ValueStateDescriptor[TaxiRide]("ride event", classOf[TaxiRide]))
+    }
 
     override def processElement(ride: TaxiRide,
-                                context: KeyedProcessFunction[Long, TaxiRide, 
TaxiRide]#Context,
-                                out: Collector[TaxiRide]): Unit = {
+                                context: KeyedProcessFunction[Long, TaxiRide, 
Long]#Context,
+                                out: Collector[Long]): Unit = {
 
-      val previousRideEvent = rideState.value()
+      val firstRideEvent = rideState.value()
 
-      if (previousRideEvent == null) {
+      if (firstRideEvent == null) {
         rideState.update(ride)
         if (ride.isStart) {
-          context.timerService().registerEventTimeTimer(getTimerTime(ride))
+          context.timerService.registerEventTimeTimer(getTimerTime(ride))
+        }
+        else if (rideTooLong(ride)) {
+          out.collect(ride.rideId)
         }
-      } else {
-        if (!ride.isStart) {
-          // it's an END event, so event saved was the START event and has a 
timer
-          // the timer hasn't fired yet, and we can safely kill the timer
-          
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent))
+      }
+      else {

Review comment:
       note sure style-wise, but before this was collapsed into a single line

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+    @Test
+    public void testLongRideAlertsAsSoonAsPossible() throws Exception {

Review comment:
       I think, we'd need a couple of more tests to verify the full behaviour, 
including:
   
   - alerting directly if you see the end event first
   - not alerting for a start event until the watermark is exactly 2h ahead 
(even if you see later data or processing time advances beyond 2h)
   - not alerting for an end event that is not too long
   - alerting if there is no event but the watermark surpasses the 2h
   - state cleanup (in the good case - not the leak we leave for the open 
discussion)
   
   Should we also verify that things, i.e. the start events, are kept in Flink 
state after all and not some other local Java store (best-effort by just 
looking at the number of state entries > 0)?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+    @Test
+    public void testLongRideAlertsAsSoonAsPossible() throws Exception {
+        KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> harness = 
setupHarness();
+
+        TaxiRide startOfLongRide = LongRidesTest.startRide(1, 
LongRidesTest.BEGINNING);
+        harness.processElement(new StreamRecord<>(startOfLongRide, 
startOfLongRide.getEventTime()));
+
+        Watermark mark2HoursLater =
+                new Watermark(LongRidesTest.BEGINNING.plusSeconds(120 * 
60).toEpochMilli());
+        harness.processWatermark(mark2HoursLater);
+
+        // Check that the result is correct
+        ConcurrentLinkedQueue<Object> actualOutput = harness.getOutput();
+        StreamRecord<Long> rideIdAtTimeOfWatermark =
+                new StreamRecord<>(startOfLongRide.rideId, 
mark2HoursLater.getTimestamp());
+        assertThat(actualOutput).containsExactly(rideIdAtTimeOfWatermark, 
mark2HoursLater);
+
+        // Check that no state or timers are left behind
+        assertThat(harness.numKeyedStateEntries()).isZero();
+        assertThat(harness.numEventTimeTimers()).isZero();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
setupHarness()
+            throws Exception {
+
+        KeyedProcessOperator<Long, TaxiRide, Long> operator =
+                new KeyedProcessOperator<>(new 
LongRidesSolution.MatchFunction());
+
+        KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, (TaxiRide r) -> r.rideId, 
BasicTypeInfo.LONG_TYPE_INFO);

Review comment:
       ```suggestion
                           operator, (TaxiRide r) -> r.rideId, Types.LONG);
   ```

##########
File path: 
long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
##########
@@ -18,55 +18,83 @@
 
 package org.apache.flink.training.exercises.longrides.scala
 
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}

Review comment:
       Most of the comments for the Java exercise apply 1:1...I'll omit listing 
them here again

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;
+
+    /** Creates a job using the source and sink provided. */
+    public LongRidesExercise(SourceFunction<TaxiRide> source, 
SinkFunction<Long> sink) {
+        this.source = source;
+        this.sink = sink;
+    }
 
     /**
-     * Main method.
+     * Creates and executes the long rides pipeline.
      *
-     * @throws Exception which occurs during job execution.
+     * <p>@throws Exception which occurs during job execution.
      */
-    public static void main(String[] args) throws Exception {
+    public void execute() throws Exception {
 
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);
 
         // start the data generator
-        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        DataStream<TaxiRide> rides = env.addSource(source, 
TypeInformation.of(TaxiRide.class));

Review comment:
       Not sure why you added this here, but it doesn't seem like we need to 
provide the type information manually
   ```suggestion
           DataStream<TaxiRide> rides = env.addSource(source);
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T> {
+    protected T[] testStream;
+    TestSourcePartitioner<T> partitioner;
+    private List<T> substream;
+
+    public ParallelTestSource(TestSourcePartitioner<T> partitioner, T... 
events) {
+        this.partitioner = partitioner;
+        this.testStream = events;
+    }
+
+    public ParallelTestSource(T... events) {
+        this.partitioner = (e -> 0);
+        this.testStream = events;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+        int numberOfParallelSubtasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+        substream = new ArrayList<>();
+
+        for (int i = 0; i < testStream.length; i++) {
+            T element = testStream[i];
+            long subtaskToUse = partitioner.partition(element) % 
numberOfParallelSubtasks;
+
+            if (subtaskToUse == indexOfThisSubtask) {
+                substream.add(element);
+            } else if (subtaskToUse < 0 || subtaskToUse > 
numberOfParallelSubtasks - 1) {

Review comment:
       I think, `subtaskToUse < 0` is the only thing you need to check here.
   ```suggestion
               } else if (subtaskToUse < 0) {
   ```

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;
+
+    /** Creates a job using the source and sink provided. */
+    public LongRidesExercise(SourceFunction<TaxiRide> source, 
SinkFunction<Long> sink) {
+        this.source = source;
+        this.sink = sink;
+    }
 
     /**
-     * Main method.
+     * Creates and executes the long rides pipeline.
      *
-     * @throws Exception which occurs during job execution.
+     * <p>@throws Exception which occurs during job execution.
      */
-    public static void main(String[] args) throws Exception {
+    public void execute() throws Exception {
 
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);

Review comment:
       Although for a production job, we shouldn't set any configuration 
parameters via code, I'd argue that we may need a special environment for these 
exercises with well-defined settings. High default parallelism (number of CPUs) 
in the IDE could, for example, overload the Flink starter with just too much 
happening in the logs while too low parallelism may hide certain behaviour.
   
   Maybe something along the lines of 
https://github.com/ververica/flink-training/blob/d07b181f1fa89137da45a8ffa67ec4cbfaf90cf1/troubleshooting/common/src/main/java/com/ververica/flink/training/common/EnvironmentUtils.java#L44
 which enables the web UI on a well-defined port.
   
   Maybe we could also hide this under a nice builder pattern...
   
   Either way, that may be a follow-up PR but not sure we should change the 
parallelism here or rather put this into its own (bigger-scoped) improvements...

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;
+
+    /** Creates a job using the source and sink provided. */
+    public LongRidesExercise(SourceFunction<TaxiRide> source, 
SinkFunction<Long> sink) {
+        this.source = source;
+        this.sink = sink;
+    }
 
     /**
-     * Main method.
+     * Creates and executes the long rides pipeline.
      *
-     * @throws Exception which occurs during job execution.
+     * <p>@throws Exception which occurs during job execution.
      */
-    public static void main(String[] args) throws Exception {
+    public void execute() throws Exception {
 
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);
 
         // start the data generator
-        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        DataStream<TaxiRide> rides = env.addSource(source, 
TypeInformation.of(TaxiRide.class));
 
-        DataStream<TaxiRide> longRides =
-                rides.keyBy((TaxiRide ride) -> ride.rideId).process(new 
MatchFunction());
+        // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+        WatermarkStrategy<TaxiRide> watermarkStrategy =
+                
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
+                        .withTimestampAssigner((ride, timestamp) -> 
ride.getEventTime());

Review comment:
       The second parameter here is:
   > The current internal timestamp of the element, or a negative value, if no 
timestamp has been assigned yet.
   
   I'm not sure (didactically) how to name that one not to be confusing, but 
`timestamp` sounds misleading ... wdyt?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesHarnessTest.java
##########
@@ -0,0 +1,56 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesHarnessTest {
+
+    @Test
+    public void testLongRideAlertsAsSoonAsPossible() throws Exception {
+        KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> harness = 
setupHarness();
+
+        TaxiRide startOfLongRide = LongRidesTest.startRide(1, 
LongRidesTest.BEGINNING);
+        harness.processElement(new StreamRecord<>(startOfLongRide, 
startOfLongRide.getEventTime()));

Review comment:
       Do you want to check that there is no output yet after processing this?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
-    public void noEnd() throws Exception {
+    public void noStartLong() throws Exception {

Review comment:
       Unit test?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
-    public void noEnd() throws Exception {
+    public void noStartLong() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedThreeHoursLater);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void longRide() throws Exception {
+    public void endIsMissing() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
-        TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusSeconds(180 * 60));
 
-        TestRideSource source =
-                new TestRideSource(rideStarted, mark2HoursLater, 
rideEnded3HoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void startIsDelayedMoreThanTwoHours() throws Exception {
-        TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide rideEndedAfter1Hour = endRide(rideStarted, 
BEGINNING.plusSeconds(60 * 60));
-        Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+    public void endComesAfter3Hours() throws Exception {
+        TaxiRide startOfLongRide = startRide(1, BEGINNING);
+        TaxiRide longRideEndedAfter3Hours = endRide(startOfLongRide, 
THREE_HOURS_LATER);
 
-        TestRideSource source =
-                new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, 
rideStarted);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(
+                        new PartitionByRideId(), startOfLongRide, 
longRideEndedAfter3Hours);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(startOfLongRide.rideId);
+    }
+
+    @Test
+    public void multipleRides() throws Exception {
+        TaxiRide startOfOneRide = startRide(1, BEGINNING);
+        TaxiRide otherRide = startRide(2, ONE_MINUTE_LATER);
+        TaxiRide oneRideEnded = endRide(startOfOneRide, THREE_HOURS_LATER);
+        TaxiRide otherRideEnded = endRide(otherRide, THREE_HOURS_LATER);

Review comment:
       Do you want to add a couple of more rides (just 2h, just below 2h, 
without end,...) to mix things up a bit (more of an ITCase)

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
-    public void noEnd() throws Exception {
+    public void noStartLong() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+        TaxiRide endedThreeHoursLater = endRide(rideStarted, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedThreeHoursLater);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void longRide() throws Exception {
+    public void endIsMissing() throws Exception {
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
-        TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusSeconds(180 * 60));
 
-        TestRideSource source =
-                new TestRideSource(rideStarted, mark2HoursLater, 
rideEnded3HoursLater);
-        assertEquals(Collections.singletonList(rideStarted), results(source));
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).containsExactly(rideStarted.rideId);
     }
 
     @Test
-    public void startIsDelayedMoreThanTwoHours() throws Exception {
-        TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide rideEndedAfter1Hour = endRide(rideStarted, 
BEGINNING.plusSeconds(60 * 60));
-        Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+    public void endComesAfter3Hours() throws Exception {

Review comment:
       Unit test?

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
 
-    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+    private static final int PARALLELISM = 2;
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
 
     @Test
     public void shortRide() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
+
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
 
-        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-        assert (results(source).isEmpty());
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), rideStarted, 
endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void outOfOrder() throws Exception {
-        Instant oneMinLater = BEGINNING.plusSeconds(60);
         TaxiRide rideStarted = startRide(1, BEGINNING);
-        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-        Long markOneMinLater = oneMinLater.toEpochMilli();
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(new PartitionByRideId(), 
endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
 
-        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-        assert (results(source).isEmpty());
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
     }
 
     @Test
     public void noStartShort() throws Exception {

Review comment:
       Is this test more of a UnitTest?
   
   ---
   
   In general, the line may be difficult to draw, but how about this:
   - an integration test focuses on the whole pipeline (so here: watermarks + 
keyBy + MatchFunction) and any interplay between these
   - a unit test focuses on the logic of the MatchFunction alone
   
   For this, we could actually also remove the watermarking code from the 
exercise and add this to the work to be done (unless you think, it may be too 
much to ask for a first exercise)

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,96 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;
+
+    /** Creates a job using the source and sink provided. */
+    public LongRidesExercise(SourceFunction<TaxiRide> source, 
SinkFunction<Long> sink) {
+        this.source = source;
+        this.sink = sink;
+    }
 
     /**
-     * Main method.
+     * Creates and executes the long rides pipeline.
      *
-     * @throws Exception which occurs during job execution.
+     * <p>@throws Exception which occurs during job execution.
      */
-    public static void main(String[] args) throws Exception {
+    public void execute() throws Exception {
 
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);
 
         // start the data generator
-        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        DataStream<TaxiRide> rides = env.addSource(source, 
TypeInformation.of(TaxiRide.class));
 
-        DataStream<TaxiRide> longRides =
-                rides.keyBy((TaxiRide ride) -> ride.rideId).process(new 
MatchFunction());
+        // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+        WatermarkStrategy<TaxiRide> watermarkStrategy =
+                
WatermarkStrategy.<TaxiRide>forBoundedOutOfOrderness(Duration.ofSeconds(60))
+                        .withTimestampAssigner((ride, timestamp) -> 
ride.getEventTime());
 
-        printOrTest(longRides);
+        // create the pipeline
+        rides.assignTimestampsAndWatermarks(watermarkStrategy)
+                .keyBy((TaxiRide ride) -> ride.rideId)

Review comment:
       Simpler? Or did you want to make the type explicit?
   ```suggestion
                   .keyBy(ride -> ride.rideId)
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
##########
@@ -0,0 +1,47 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+
+/**
+ * This allows the tests to be run against both the exercises and the 
solutions.
+ *
+ * <p>If an exercise throws MissingSolutionException, then the solution is 
tested.
+ */
+public class ComposedPipeline<IN, OUT> implements ExecutablePipeline<IN, OUT> {
+
+    private ExecutablePipeline<IN, OUT> exercise;
+    private ExecutablePipeline<IN, OUT> solution;
+
+    public ComposedPipeline(
+            ExecutablePipeline<IN, OUT> exercise, ExecutablePipeline<IN, OUT> 
solution) {
+        this.exercise = exercise;
+        this.solution = solution;
+    }
+
+    @Override
+    public void execute(SourceFunction<IN> source, TestSink<OUT> sink) throws 
Exception {
+
+        sink.reset();
+
+        try {
+            exercise.execute(source, sink);
+        } catch (Exception e) {
+            if (ultimateCauseIsMissingSolution(e)) {
+                solution.execute(source, sink);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private boolean ultimateCauseIsMissingSolution(Throwable e) {
+        if (e instanceof MissingSolutionException) {
+            return true;
+        } else if (e.getCause() != null) {
+            return ultimateCauseIsMissingSolution(e.getCause());
+        } else {
+            return false;
+        }

Review comment:
       I'm not sure how deep these stack traces could be, but wouldn't it be 
better to iterate through these instead?
   ```suggestion
           while (e != null) {
               if (e instanceof MissingSolutionException) {
                   return true;
               } else {
                   e = e.getCause();
               }
           }
           return false;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to