This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git

commit c4baefd1830abe3ba9bcf200be0f5d00095cae12
Author: Rufus Refactor <[email protected]>
AuthorDate: Tue Jul 13 21:45:34 2021 +0200

    [FLINK-23338] Format code with Spotless/google-java-format
---
 README.md                                          |   2 +-
 .../examples/ridecount/RideCountExample.java       |  60 +--
 .../exercises/common/datatypes/TaxiFare.java       | 154 ++++----
 .../exercises/common/datatypes/TaxiRide.java       | 297 +++++++-------
 .../common/sources/TaxiFareGenerator.java          |  35 +-
 .../common/sources/TaxiRideGenerator.java          | 109 +++---
 .../exercises/common/utils/DataGenerator.java      | 304 +++++++--------
 .../exercises/common/utils/ExerciseBase.java       |  98 ++---
 .../training/exercises/common/utils/GeoUtils.java  | 433 ++++++++++-----------
 .../common/utils/MissingSolutionException.java     |  10 +-
 .../exercises/testing/TaxiRideTestBase.java        | 291 +++++++-------
 .../training/exercises/testing/TestSource.java     |  44 +--
 hourly-tips/DISCUSSION.md                          |  34 +-
 .../exercises/hourlytips/HourlyTipsExercise.java   |  38 +-
 .../solutions/hourlytips/HourlyTipsSolution.java   |  92 ++---
 .../exercises/hourlytips/HourlyTipsTest.java       |  97 ++---
 long-ride-alerts/DISCUSSION.md                     |   2 +-
 long-ride-alerts/README.md                         |   8 +-
 .../exercises/longrides/LongRidesExercise.java     |  61 ++-
 .../solutions/longrides/LongRidesSolution.java     | 136 +++----
 .../exercises/longrides/LongRidesTest.java         | 178 +++++----
 .../ridecleansing/RideCleansingExercise.java       |  57 ++-
 .../ridecleansing/RideCleansingSolution.java       |  58 +--
 .../exercises/ridecleansing/RideCleansingTest.java |  73 ++--
 .../ridesandfares/RidesAndFaresExercise.java       |  69 ++--
 .../ridesandfares/RidesAndFaresSolution.java       | 162 ++++----
 .../exercises/ridesandfares/RidesAndFaresTest.java |  93 ++---
 27 files changed, 1489 insertions(+), 1506 deletions(-)

diff --git a/README.md b/README.md
index 834032a..37c1d26 100644
--- a/README.md
+++ b/README.md
@@ -111,7 +111,7 @@ Once that’s done you should be able to open 
[`RideCleansingTest`](ride-cleansi
 
 ## Using the Taxi Data Streams
 
-These exercises use data 
[generators](common/src/main/java/org/apache/flink/training/exercises/common/sources)
 that produce simulated event streams 
+These exercises use data 
[generators](common/src/main/java/org/apache/flink/training/exercises/common/sources)
 that produce simulated event streams
 inspired by those shared by the [New York City Taxi & Limousine 
Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml)
 in their public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi 
rides in New York City.
 
diff --git 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index 74e56fe..36b7b89 100644
--- 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -29,44 +29,46 @@ import 
org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 /**
  * Example that counts the rides for each driver.
  *
- * <p>Note that this is implicitly keeping state for each driver.
- * This sort of simple, non-windowed aggregation on an unbounded set of keys 
will use an unbounded amount of state.
- * When this is an issue, look at the SQL/Table API, or ProcessFunction, or 
state TTL, all of which provide
+ * <p>Note that this is implicitly keeping state for each driver. This sort of 
simple, non-windowed
+ * aggregation on an unbounded set of keys will use an unbounded amount of 
state. When this is an
+ * issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of 
which provide
  * mechanisms for expiring state for stale keys.
  */
 public class RideCountExample {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               // start the data generator
-               DataStream<TaxiRide> rides = env.addSource(new 
TaxiRideGenerator());
+        // start the data generator
+        DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
 
-               // map each ride to a tuple of (driverId, 1)
-               DataStream<Tuple2<Long, Long>> tuples = rides.map(new 
MapFunction<TaxiRide, Tuple2<Long, Long>>() {
-                                       @Override
-                                       public Tuple2<Long, Long> map(TaxiRide 
ride) {
-                                               return Tuple2.of(ride.driverId, 
1L);
-                                       }
-               });
+        // map each ride to a tuple of (driverId, 1)
+        DataStream<Tuple2<Long, Long>> tuples =
+                rides.map(
+                        new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
+                            @Override
+                            public Tuple2<Long, Long> map(TaxiRide ride) {
+                                return Tuple2.of(ride.driverId, 1L);
+                            }
+                        });
 
-               // partition the stream by the driverId
-               KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = 
tuples.keyBy(t -> t.f0);
+        // partition the stream by the driverId
+        KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t 
-> t.f0);
 
-               // count the rides for each driver
-               DataStream<Tuple2<Long, Long>> rideCounts = 
keyedByDriverId.sum(1);
+        // count the rides for each driver
+        DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1);
 
-               // we could, in fact, print out any or all of these streams
-               rideCounts.print();
+        // we could, in fact, print out any or all of these streams
+        rideCounts.print();
 
-               // run the cleansing pipeline
-               env.execute("Ride Count");
-       }
+        // run the cleansing pipeline
+        env.execute("Ride Count");
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index 013f2d3..aa74669 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -26,95 +26,93 @@ import java.time.Instant;
 /**
  * A TaxiFare has payment information about a taxi ride.
  *
- * <p>It has these fields in common with the TaxiRides
- * - the rideId
- * - the taxiId
- * - the driverId
- * - the startTime
+ * <p>It has these fields in common with the TaxiRides - the rideId - the 
taxiId - the driverId -
+ * the startTime
  *
- * <p>It also includes
- * - the paymentType
- * - the tip
- * - the tolls
- * - the totalFare
+ * <p>It also includes - the paymentType - the tip - the tolls - the totalFare
  */
 public class TaxiFare implements Serializable {
 
-       /**
-        * Creates a TaxiFare with now as the start time.
-        */
-       public TaxiFare() {
-               this.startTime = Instant.now();
-       }
+    /** Creates a TaxiFare with now as the start time. */
+    public TaxiFare() {
+        this.startTime = Instant.now();
+    }
 
-       /**
-        * Invents a TaxiFare.
-        */
-       public TaxiFare(long rideId) {
-               DataGenerator g = new DataGenerator(rideId);
+    /** Invents a TaxiFare. */
+    public TaxiFare(long rideId) {
+        DataGenerator g = new DataGenerator(rideId);
 
-               this.rideId = rideId;
-               this.taxiId = g.taxiId();
-               this.driverId = g.driverId();
-               this.startTime = g.startTime();
-               this.paymentType = g.paymentType();
-               this.tip = g.tip();
-               this.tolls = g.tolls();
-               this.totalFare = g.totalFare();
-       }
+        this.rideId = rideId;
+        this.taxiId = g.taxiId();
+        this.driverId = g.driverId();
+        this.startTime = g.startTime();
+        this.paymentType = g.paymentType();
+        this.tip = g.tip();
+        this.tolls = g.tolls();
+        this.totalFare = g.totalFare();
+    }
 
-       /**
-        * Creates a TaxiFare with the given parameters.
-        */
-       public TaxiFare(long rideId, long taxiId, long driverId, Instant 
startTime, String paymentType, float tip, float tolls, float totalFare) {
-               this.rideId = rideId;
-               this.taxiId = taxiId;
-               this.driverId = driverId;
-               this.startTime = startTime;
-               this.paymentType = paymentType;
-               this.tip = tip;
-               this.tolls = tolls;
-               this.totalFare = totalFare;
-       }
+    /** Creates a TaxiFare with the given parameters. */
+    public TaxiFare(
+            long rideId,
+            long taxiId,
+            long driverId,
+            Instant startTime,
+            String paymentType,
+            float tip,
+            float tolls,
+            float totalFare) {
+        this.rideId = rideId;
+        this.taxiId = taxiId;
+        this.driverId = driverId;
+        this.startTime = startTime;
+        this.paymentType = paymentType;
+        this.tip = tip;
+        this.tolls = tolls;
+        this.totalFare = totalFare;
+    }
 
-       public long rideId;
-       public long taxiId;
-       public long driverId;
-       public Instant startTime;
-       public String paymentType;
-       public float tip;
-       public float tolls;
-       public float totalFare;
+    public long rideId;
+    public long taxiId;
+    public long driverId;
+    public Instant startTime;
+    public String paymentType;
+    public float tip;
+    public float tolls;
+    public float totalFare;
 
-       @Override
-       public String toString() {
+    @Override
+    public String toString() {
 
-               return rideId + "," +
-                               taxiId + "," +
-                               driverId + "," +
-                               startTime.toString() + "," +
-                               paymentType + "," +
-                               tip + "," +
-                               tolls + "," +
-                               totalFare;
-       }
+        return rideId
+                + ","
+                + taxiId
+                + ","
+                + driverId
+                + ","
+                + startTime.toString()
+                + ","
+                + paymentType
+                + ","
+                + tip
+                + ","
+                + tolls
+                + ","
+                + totalFare;
+    }
 
-       @Override
-       public boolean equals(Object other) {
-               return other instanceof TaxiFare &&
-                               this.rideId == ((TaxiFare) other).rideId;
-       }
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof TaxiFare && this.rideId == ((TaxiFare) 
other).rideId;
+    }
 
-       @Override
-       public int hashCode() {
-               return (int) this.rideId;
-       }
-
-       /**
-        * Gets the fare's start time.
-        */
-       public long getEventTime() {
-               return startTime.toEpochMilli();
-       }
+    @Override
+    public int hashCode() {
+        return (int) this.rideId;
+    }
 
+    /** Gets the fare's start time. */
+    public long getEventTime() {
+        return startTime.toEpochMilli();
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index 3bec5e7..378b9a0 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -30,156 +30,155 @@ import java.time.Instant;
  * A TaxiRide is a taxi ride event. There are two types of events, a taxi ride 
start event and a
  * taxi ride end event. The isStart flag specifies the type of the event.
  *
- * <p>A TaxiRide consists of
- * - the rideId of the event which is identical for start and end record
- * - the type of the event (start or end)
- * - the time of the event
- * - the longitude of the start location
- * - the latitude of the start location
- * - the longitude of the end location
- * - the latitude of the end location
- * - the passengerCnt of the ride
- * - the taxiId
- * - the driverId
+ * <p>A TaxiRide consists of - the rideId of the event which is identical for 
start and end record -
+ * the type of the event (start or end) - the time of the event - the 
longitude of the start
+ * location - the latitude of the start location - the longitude of the end 
location - the latitude
+ * of the end location - the passengerCnt of the ride - the taxiId - the 
driverId
  */
 public class TaxiRide implements Comparable<TaxiRide>, Serializable {
 
-       /**
-        * Creates a new TaxiRide with now as start and end time.
-        */
-       public TaxiRide() {
-               this.startTime = Instant.now();
-               this.endTime = Instant.now();
-       }
-
-       /**
-        * Invents a TaxiRide.
-        */
-       public TaxiRide(long rideId, boolean isStart) {
-               DataGenerator g = new DataGenerator(rideId);
-
-               this.rideId = rideId;
-               this.isStart = isStart;
-               this.startTime = g.startTime();
-               this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime();
-               this.startLon = g.startLon();
-               this.startLat = g.startLat();
-               this.endLon = g.endLon();
-               this.endLat = g.endLat();
-               this.passengerCnt = g.passengerCnt();
-               this.taxiId = g.taxiId();
-               this.driverId = g.driverId();
-       }
-
-       /**
-        * Creates a TaxiRide with the given parameters.
-        */
-       public TaxiRide(long rideId, boolean isStart, Instant startTime, 
Instant endTime,
-                       float startLon, float startLat, float endLon, float 
endLat,
-                       short passengerCnt, long taxiId, long driverId) {
-               this.rideId = rideId;
-               this.isStart = isStart;
-               this.startTime = startTime;
-               this.endTime = endTime;
-               this.startLon = startLon;
-               this.startLat = startLat;
-               this.endLon = endLon;
-               this.endLat = endLat;
-               this.passengerCnt = passengerCnt;
-               this.taxiId = taxiId;
-               this.driverId = driverId;
-       }
-
-       public long rideId;
-       public boolean isStart;
-       public Instant startTime;
-       public Instant endTime;
-       public float startLon;
-       public float startLat;
-       public float endLon;
-       public float endLat;
-       public short passengerCnt;
-       public long taxiId;
-       public long driverId;
-
-       @Override
-       public String toString() {
-
-               return rideId + "," +
-                               (isStart ? "START" : "END") + "," +
-                               startTime.toString() + "," +
-                               endTime.toString() + "," +
-                               startLon + "," +
-                               startLat + "," +
-                               endLon + "," +
-                               endLat + "," +
-                               passengerCnt + "," +
-                               taxiId + "," +
-                               driverId;
-       }
-
-       /**
-        * Compares this TaxiRide with the given one.
-        *
-        * <ul>
-        *     <li>sort by timestamp,</li>
-        *     <li>putting START events before END events if they have the same 
timestamp</li>
-        * </ul>
-        */
-       public int compareTo(@Nullable TaxiRide other) {
-               if (other == null) {
-                       return 1;
-               }
-               int compareTimes = Long.compare(this.getEventTime(), 
other.getEventTime());
-               if (compareTimes == 0) {
-                       if (this.isStart == other.isStart) {
-                               return 0;
-                       }
-                       else {
-                               if (this.isStart) {
-                                       return -1;
-                               }
-                               else {
-                                       return 1;
-                               }
-                       }
-               }
-               else {
-                       return compareTimes;
-               }
-       }
-
-       @Override
-       public boolean equals(Object other) {
-               return other instanceof TaxiRide &&
-                               this.rideId == ((TaxiRide) other).rideId;
-       }
-
-       @Override
-       public int hashCode() {
-               return (int) this.rideId;
-       }
-
-       /**
-        * Gets the ride's time stamp (start or end time depending on {@link 
#isStart}).
-        */
-       public long getEventTime() {
-               if (isStart) {
-                       return startTime.toEpochMilli();
-               }
-               else {
-                       return endTime.toEpochMilli();
-               }
-       }
-
-       /**
-        * Gets the distance from the ride location to the given one.
-        */
-       public double getEuclideanDistance(double longitude, double latitude) {
-               if (this.isStart) {
-                       return GeoUtils.getEuclideanDistance((float) longitude, 
(float) latitude, this.startLon, this.startLat);
-               } else {
-                       return GeoUtils.getEuclideanDistance((float) longitude, 
(float) latitude, this.endLon, this.endLat);
-               }
-       }
+    /** Creates a new TaxiRide with now as start and end time. */
+    public TaxiRide() {
+        this.startTime = Instant.now();
+        this.endTime = Instant.now();
+    }
+
+    /** Invents a TaxiRide. */
+    public TaxiRide(long rideId, boolean isStart) {
+        DataGenerator g = new DataGenerator(rideId);
+
+        this.rideId = rideId;
+        this.isStart = isStart;
+        this.startTime = g.startTime();
+        this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime();
+        this.startLon = g.startLon();
+        this.startLat = g.startLat();
+        this.endLon = g.endLon();
+        this.endLat = g.endLat();
+        this.passengerCnt = g.passengerCnt();
+        this.taxiId = g.taxiId();
+        this.driverId = g.driverId();
+    }
+
+    /** Creates a TaxiRide with the given parameters. */
+    public TaxiRide(
+            long rideId,
+            boolean isStart,
+            Instant startTime,
+            Instant endTime,
+            float startLon,
+            float startLat,
+            float endLon,
+            float endLat,
+            short passengerCnt,
+            long taxiId,
+            long driverId) {
+        this.rideId = rideId;
+        this.isStart = isStart;
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.startLon = startLon;
+        this.startLat = startLat;
+        this.endLon = endLon;
+        this.endLat = endLat;
+        this.passengerCnt = passengerCnt;
+        this.taxiId = taxiId;
+        this.driverId = driverId;
+    }
+
+    public long rideId;
+    public boolean isStart;
+    public Instant startTime;
+    public Instant endTime;
+    public float startLon;
+    public float startLat;
+    public float endLon;
+    public float endLat;
+    public short passengerCnt;
+    public long taxiId;
+    public long driverId;
+
+    @Override
+    public String toString() {
+
+        return rideId
+                + ","
+                + (isStart ? "START" : "END")
+                + ","
+                + startTime.toString()
+                + ","
+                + endTime.toString()
+                + ","
+                + startLon
+                + ","
+                + startLat
+                + ","
+                + endLon
+                + ","
+                + endLat
+                + ","
+                + passengerCnt
+                + ","
+                + taxiId
+                + ","
+                + driverId;
+    }
+
+    /**
+     * Compares this TaxiRide with the given one.
+     *
+     * <ul>
+     *   <li>sort by timestamp,
+     *   <li>putting START events before END events if they have the same 
timestamp
+     * </ul>
+     */
+    public int compareTo(@Nullable TaxiRide other) {
+        if (other == null) {
+            return 1;
+        }
+        int compareTimes = Long.compare(this.getEventTime(), 
other.getEventTime());
+        if (compareTimes == 0) {
+            if (this.isStart == other.isStart) {
+                return 0;
+            } else {
+                if (this.isStart) {
+                    return -1;
+                } else {
+                    return 1;
+                }
+            }
+        } else {
+            return compareTimes;
+        }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof TaxiRide && this.rideId == ((TaxiRide) 
other).rideId;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) this.rideId;
+    }
+
+    /** Gets the ride's time stamp (start or end time depending on {@link 
#isStart}). */
+    public long getEventTime() {
+        if (isStart) {
+            return startTime.toEpochMilli();
+        } else {
+            return endTime.toEpochMilli();
+        }
+    }
+
+    /** Gets the distance from the ride location to the given one. */
+    public double getEuclideanDistance(double longitude, double latitude) {
+        if (this.isStart) {
+            return GeoUtils.getEuclideanDistance(
+                    (float) longitude, (float) latitude, this.startLon, 
this.startLat);
+        } else {
+            return GeoUtils.getEuclideanDistance(
+                    (float) longitude, (float) latitude, this.endLon, 
this.endLat);
+        }
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
index c3ca076..17686fa 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
@@ -27,31 +27,30 @@ import 
org.apache.flink.training.exercises.common.datatypes.TaxiFare;
  * timestamps.
  *
  * <p>The stream is generated in order, and it includes Watermarks.
- *
  */
 public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
 
-       private volatile boolean running = true;
+    private volatile boolean running = true;
 
-       @Override
-       public void run(SourceContext<TaxiFare> ctx) throws Exception {
+    @Override
+    public void run(SourceContext<TaxiFare> ctx) throws Exception {
 
-               long id = 1;
+        long id = 1;
 
-               while (running) {
-                       TaxiFare fare = new TaxiFare(id);
-                       id += 1;
+        while (running) {
+            TaxiFare fare = new TaxiFare(id);
+            id += 1;
 
-                       ctx.collectWithTimestamp(fare, fare.getEventTime());
-                       ctx.emitWatermark(new Watermark(fare.getEventTime()));
+            ctx.collectWithTimestamp(fare, fare.getEventTime());
+            ctx.emitWatermark(new Watermark(fare.getEventTime()));
 
-                       // match our event production rate to that of the 
TaxiRideGenerator
-                       Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT);
-               }
-       }
+            // match our event production rate to that of the TaxiRideGenerator
+            Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT);
+        }
+    }
 
-       @Override
-       public void cancel() {
-               running = false;
-       }
+    @Override
+    public void cancel() {
+        running = false;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
index 8ee5d01..40f498a 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
@@ -32,61 +32,62 @@ import java.util.Random;
  * timestamps.
  *
  * <p>The stream is produced out-of-order, and includes Watermarks (with no 
late events).
- *
  */
 public class TaxiRideGenerator implements SourceFunction<TaxiRide> {
 
-       public static final int SLEEP_MILLIS_PER_EVENT = 10;
-       private static final int BATCH_SIZE = 5;
-       private volatile boolean running = true;
-
-       @Override
-       public void run(SourceContext<TaxiRide> ctx) throws Exception {
-
-               PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
-               long id = 0;
-               long maxStartTime = 0;
-
-               while (running) {
-
-                       // generate a batch of START events
-                       List<TaxiRide> startEvents = new 
ArrayList<TaxiRide>(BATCH_SIZE);
-                       for (int i = 1; i <= BATCH_SIZE; i++) {
-                               TaxiRide ride = new TaxiRide(id + i, true);
-                               startEvents.add(ride);
-                               // the start times may be in order, but let's 
not assume that
-                               maxStartTime = Math.max(maxStartTime, 
ride.startTime.toEpochMilli());
-                       }
-
-                       // enqueue the corresponding END events
-                       for (int i = 1; i <= BATCH_SIZE; i++) {
-                               endEventQ.add(new TaxiRide(id + i, false));
-                       }
-
-                       // release the END events coming before the end of this 
new batch
-                       // (this allows a few END events to precede their 
matching START event)
-                       while (endEventQ.peek().getEventTime() <= maxStartTime) 
{
-                               TaxiRide ride = endEventQ.poll();
-                               ctx.collectWithTimestamp(ride, 
ride.getEventTime());
-                       }
-
-                       // then emit the new START events (out-of-order)
-                       java.util.Collections.shuffle(startEvents, new 
Random(id));
-                       startEvents.iterator().forEachRemaining(r -> 
ctx.collectWithTimestamp(r, r.getEventTime()));
-
-                       // produce a Watermark
-                       ctx.emitWatermark(new Watermark(maxStartTime));
-
-                       // prepare for the next batch
-                       id += BATCH_SIZE;
-
-                       // don't go too fast
-                       Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
-               }
-       }
-
-       @Override
-       public void cancel() {
-               running = false;
-       }
+    public static final int SLEEP_MILLIS_PER_EVENT = 10;
+    private static final int BATCH_SIZE = 5;
+    private volatile boolean running = true;
+
+    @Override
+    public void run(SourceContext<TaxiRide> ctx) throws Exception {
+
+        PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
+        long id = 0;
+        long maxStartTime = 0;
+
+        while (running) {
+
+            // generate a batch of START events
+            List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
+            for (int i = 1; i <= BATCH_SIZE; i++) {
+                TaxiRide ride = new TaxiRide(id + i, true);
+                startEvents.add(ride);
+                // the start times may be in order, but let's not assume that
+                maxStartTime = Math.max(maxStartTime, 
ride.startTime.toEpochMilli());
+            }
+
+            // enqueue the corresponding END events
+            for (int i = 1; i <= BATCH_SIZE; i++) {
+                endEventQ.add(new TaxiRide(id + i, false));
+            }
+
+            // release the END events coming before the end of this new batch
+            // (this allows a few END events to precede their matching START 
event)
+            while (endEventQ.peek().getEventTime() <= maxStartTime) {
+                TaxiRide ride = endEventQ.poll();
+                ctx.collectWithTimestamp(ride, ride.getEventTime());
+            }
+
+            // then emit the new START events (out-of-order)
+            java.util.Collections.shuffle(startEvents, new Random(id));
+            startEvents
+                    .iterator()
+                    .forEachRemaining(r -> ctx.collectWithTimestamp(r, 
r.getEventTime()));
+
+            // produce a Watermark
+            ctx.emitWatermark(new Watermark(maxStartTime));
+
+            // prepare for the next batch
+            id += BATCH_SIZE;
+
+            // don't go too fast
+            Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        running = false;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
index f40c3c4..24439bd 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
@@ -30,167 +30,145 @@ import java.util.Random;
  */
 public class DataGenerator {
 
-       private static final int SECONDS_BETWEEN_RIDES = 20;
-       private static final int NUMBER_OF_DRIVERS = 200;
-       private static final Instant beginTime = 
Instant.parse("2020-01-01T12:00:00.00Z");
-
-       private transient long rideId;
-
-       /**
-        * Creates a DataGenerator for the specified rideId.
-        */
-       public DataGenerator(long rideId) {
-               this.rideId = rideId;
-       }
-
-       /**
-        * Deterministically generates and returns the startTime for this ride.
-        */
-       public Instant startTime() {
-               return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
-       }
-
-       /**
-        * Deterministically generates and returns the endTime for this ride.
-        */
-       public Instant endTime() {
-               return startTime().plusSeconds(60 * rideDurationMinutes());
-       }
-
-       /**
-        * Deterministically generates and returns the driverId for this ride.
-        * The HourlyTips exercise is more interesting if aren't too many 
drivers.
-        */
-       public long driverId() {
-               Random rnd = new Random(rideId);
-               return 2013000000 + rnd.nextInt(NUMBER_OF_DRIVERS);
-       }
-
-       /**
-        * Deterministically generates and returns the taxiId for this ride.
-        */
-       public long taxiId() {
-               return driverId();
-       }
-
-       /**
-        * Deterministically generates and returns the startLat for this ride.
-        *
-        * <p>The locations are used in the RideCleansing exercise.
-        * We want some rides to be outside of NYC.
-        */
-       public float startLat() {
-               return aFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) 
(GeoUtils.LAT_NORTH + 0.1F));
-       }
-
-       /**
-        * Deterministically generates and returns the startLon for this ride.
-        */
-       public float startLon() {
-               return aFloat((float) (GeoUtils.LON_WEST - 0.1), (float) 
(GeoUtils.LON_EAST + 0.1F));
-       }
-
-       /**
-        * Deterministically generates and returns the endLat for this ride.
-        */
-       public float endLat() {
-               return bFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) 
(GeoUtils.LAT_NORTH + 0.1F));
-       }
-
-       /**
-        * Deterministically generates and returns the endLon for this ride.
-        */
-       public float endLon() {
-               return bFloat((float) (GeoUtils.LON_WEST - 0.1), (float) 
(GeoUtils.LON_EAST + 0.1F));
-       }
-
-       /**
-        * Deterministically generates and returns the passengerCnt for this 
ride.
-        */
-       public short passengerCnt() {
-               return (short) aLong(1L, 4L);
-       }
-
-       /**
-        * Deterministically generates and returns the paymentType for this 
ride.
-        */
-       public String paymentType() {
-               return (rideId % 2 == 0) ? "CARD" : "CASH";
-       }
-
-       /**
-        * Deterministically generates and returns the tip for this ride.
-        *
-        * <p>The HourlyTips exercise is more interesting if there's some 
significant variation in tipping.
-        */
-       public float tip() {
-               return aLong(0L, 60L, 10F, 15F);
-       }
-
-       /**
-        * Deterministically generates and returns the tolls for this ride.
-        */
-       public float tolls() {
-               return (rideId % 10 == 0) ? aLong(0L, 5L) : 0L;
-       }
-
-       /**
-        * Deterministically generates and returns the totalFare for this ride.
-        */
-       public float totalFare() {
-               return (float) (3.0 + (1.0 * rideDurationMinutes()) + tip() + 
tolls());
-       }
-
-       /**
-        * The LongRides exercise needs to have some rides with a duration > 2 
hours, but not too many.
-        */
-       private long rideDurationMinutes() {
-               return aLong(0L, 600, 20, 40);
-       }
-
-       // -------------------------------------
-
-       private long aLong(long min, long max) {
-               float mean = (min + max) / 2.0F;
-               float stddev = (max - min) / 8F;
-
-               return aLong(min, max, mean, stddev);
-       }
-
-       // the rideId is used as the seed to guarantee deterministic results
-       private long aLong(long min, long max, float mean, float stddev) {
-               Random rnd = new Random(rideId);
-               long value;
-               do {
-                       value = (long) Math.round((stddev * rnd.nextGaussian()) 
+ mean);
-               } while ((value < min) || (value > max));
-               return value;
-       }
-
-       // -------------------------------------
-
-       private float aFloat(float min, float max) {
-               float mean = (min + max) / 2.0F;
-               float stddev = (max - min) / 8F;
-
-               return aFloat(rideId, min, max, mean, stddev);
-       }
-
-       private float bFloat(float min, float max) {
-               float mean = (min + max) / 2.0F;
-               float stddev = (max - min) / 8F;
-
-               return aFloat(rideId + 42, min, max, mean, stddev);
-       }
-
-       // the rideId is used as the seed to guarantee deterministic results
-       private float aFloat(long seed, float min, float max, float mean, float 
stddev) {
-               Random rnd = new Random(seed);
-               float value;
-               do {
-                       value = (float) (stddev * rnd.nextGaussian()) + mean;
-               } while ((value < min) || (value > max));
-               return value;
-       }
-
+    private static final int SECONDS_BETWEEN_RIDES = 20;
+    private static final int NUMBER_OF_DRIVERS = 200;
+    private static final Instant beginTime = 
Instant.parse("2020-01-01T12:00:00.00Z");
+
+    private transient long rideId;
+
+    /** Creates a DataGenerator for the specified rideId. */
+    public DataGenerator(long rideId) {
+        this.rideId = rideId;
+    }
+
+    /** Deterministically generates and returns the startTime for this ride. */
+    public Instant startTime() {
+        return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
+    }
+
+    /** Deterministically generates and returns the endTime for this ride. */
+    public Instant endTime() {
+        return startTime().plusSeconds(60 * rideDurationMinutes());
+    }
+
+    /**
+     * Deterministically generates and returns the driverId for this ride. The 
HourlyTips exercise
+     * is more interesting if aren't too many drivers.
+     */
+    public long driverId() {
+        Random rnd = new Random(rideId);
+        return 2013000000 + rnd.nextInt(NUMBER_OF_DRIVERS);
+    }
+
+    /** Deterministically generates and returns the taxiId for this ride. */
+    public long taxiId() {
+        return driverId();
+    }
+
+    /**
+     * Deterministically generates and returns the startLat for this ride.
+     *
+     * <p>The locations are used in the RideCleansing exercise. We want some 
rides to be outside of
+     * NYC.
+     */
+    public float startLat() {
+        return aFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) 
(GeoUtils.LAT_NORTH + 0.1F));
+    }
+
+    /** Deterministically generates and returns the startLon for this ride. */
+    public float startLon() {
+        return aFloat((float) (GeoUtils.LON_WEST - 0.1), (float) 
(GeoUtils.LON_EAST + 0.1F));
+    }
+
+    /** Deterministically generates and returns the endLat for this ride. */
+    public float endLat() {
+        return bFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) 
(GeoUtils.LAT_NORTH + 0.1F));
+    }
+
+    /** Deterministically generates and returns the endLon for this ride. */
+    public float endLon() {
+        return bFloat((float) (GeoUtils.LON_WEST - 0.1), (float) 
(GeoUtils.LON_EAST + 0.1F));
+    }
+
+    /** Deterministically generates and returns the passengerCnt for this 
ride. */
+    public short passengerCnt() {
+        return (short) aLong(1L, 4L);
+    }
+
+    /** Deterministically generates and returns the paymentType for this ride. 
*/
+    public String paymentType() {
+        return (rideId % 2 == 0) ? "CARD" : "CASH";
+    }
+
+    /**
+     * Deterministically generates and returns the tip for this ride.
+     *
+     * <p>The HourlyTips exercise is more interesting if there's some 
significant variation in
+     * tipping.
+     */
+    public float tip() {
+        return aLong(0L, 60L, 10F, 15F);
+    }
+
+    /** Deterministically generates and returns the tolls for this ride. */
+    public float tolls() {
+        return (rideId % 10 == 0) ? aLong(0L, 5L) : 0L;
+    }
+
+    /** Deterministically generates and returns the totalFare for this ride. */
+    public float totalFare() {
+        return (float) (3.0 + (1.0 * rideDurationMinutes()) + tip() + tolls());
+    }
+
+    /**
+     * The LongRides exercise needs to have some rides with a duration > 2 
hours, but not too many.
+     */
+    private long rideDurationMinutes() {
+        return aLong(0L, 600, 20, 40);
+    }
+
+    // -------------------------------------
+
+    private long aLong(long min, long max) {
+        float mean = (min + max) / 2.0F;
+        float stddev = (max - min) / 8F;
+
+        return aLong(min, max, mean, stddev);
+    }
+
+    // the rideId is used as the seed to guarantee deterministic results
+    private long aLong(long min, long max, float mean, float stddev) {
+        Random rnd = new Random(rideId);
+        long value;
+        do {
+            value = (long) Math.round((stddev * rnd.nextGaussian()) + mean);
+        } while ((value < min) || (value > max));
+        return value;
+    }
+
+    // -------------------------------------
+
+    private float aFloat(float min, float max) {
+        float mean = (min + max) / 2.0F;
+        float stddev = (max - min) / 8F;
+
+        return aFloat(rideId, min, max, mean, stddev);
+    }
+
+    private float bFloat(float min, float max) {
+        float mean = (min + max) / 2.0F;
+        float stddev = (max - min) / 8F;
+
+        return aFloat(rideId + 42, min, max, mean, stddev);
+    }
+
+    // the rideId is used as the seed to guarantee deterministic results
+    private float aFloat(long seed, float min, float max, float mean, float 
stddev) {
+        Random rnd = new Random(seed);
+        float value;
+        do {
+            value = (float) (stddev * rnd.nextGaussian()) + mean;
+        } while ((value < min) || (value > max));
+        return value;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
index 172b300..c59a7a7 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
@@ -23,65 +23,53 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 
-/**
- * Base for all exercises with a few helper methods.
- */
+/** Base for all exercises with a few helper methods. */
 public class ExerciseBase {
-       public static SourceFunction<TaxiRide> rides = null;
-       public static SourceFunction<TaxiFare> fares = null;
-       public static SourceFunction<String> strings = null;
-       public static SinkFunction out = null;
-       public static int parallelism = 4;
+    public static SourceFunction<TaxiRide> rides = null;
+    public static SourceFunction<TaxiFare> fares = null;
+    public static SourceFunction<String> strings = null;
+    public static SinkFunction out = null;
+    public static int parallelism = 4;
 
-       /**
-        * Retrieves a test source during unit tests and the given one during 
normal execution.
-        */
-       public static SourceFunction<TaxiRide> 
rideSourceOrTest(SourceFunction<TaxiRide> source) {
-               if (rides == null) {
-                       return source;
-               }
-               return rides;
-       }
+    /** Retrieves a test source during unit tests and the given one during 
normal execution. */
+    public static SourceFunction<TaxiRide> 
rideSourceOrTest(SourceFunction<TaxiRide> source) {
+        if (rides == null) {
+            return source;
+        }
+        return rides;
+    }
 
-       /**
-        * Retrieves a test source during unit tests and the given one during 
normal execution.
-        */
-       public static SourceFunction<TaxiFare> 
fareSourceOrTest(SourceFunction<TaxiFare> source) {
-               if (fares == null) {
-                       return source;
-               }
-               return fares;
-       }
+    /** Retrieves a test source during unit tests and the given one during 
normal execution. */
+    public static SourceFunction<TaxiFare> 
fareSourceOrTest(SourceFunction<TaxiFare> source) {
+        if (fares == null) {
+            return source;
+        }
+        return fares;
+    }
 
-       /**
-        * Retrieves a test source during unit tests and the given one during 
normal execution.
-        */
-       public static SourceFunction<String> 
stringSourceOrTest(SourceFunction<String> source) {
-               if (strings == null) {
-                       return source;
-               }
-               return strings;
-       }
+    /** Retrieves a test source during unit tests and the given one during 
normal execution. */
+    public static SourceFunction<String> 
stringSourceOrTest(SourceFunction<String> source) {
+        if (strings == null) {
+            return source;
+        }
+        return strings;
+    }
 
-       /**
-        * Prints the given data stream during normal execution and collects 
outputs during tests.
-        */
-       public static void 
printOrTest(org.apache.flink.streaming.api.datastream.DataStream<?> ds) {
-               if (out == null) {
-                       ds.print();
-               } else {
-                       ds.addSink(out);
-               }
-       }
+    /** Prints the given data stream during normal execution and collects 
outputs during tests. */
+    public static void 
printOrTest(org.apache.flink.streaming.api.datastream.DataStream<?> ds) {
+        if (out == null) {
+            ds.print();
+        } else {
+            ds.addSink(out);
+        }
+    }
 
-       /**
-        * Prints the given data stream during normal execution and collects 
outputs during tests.
-        */
-       public static void 
printOrTest(org.apache.flink.streaming.api.scala.DataStream<?> ds) {
-               if (out == null) {
-                       ds.print();
-               } else {
-                       ds.addSink(out);
-               }
-       }
+    /** Prints the given data stream during normal execution and collects 
outputs during tests. */
+    public static void 
printOrTest(org.apache.flink.streaming.api.scala.DataStream<?> ds) {
+        if (out == null) {
+            ds.print();
+        } else {
+            ds.addSink(out);
+        }
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
index e638ab3..68e93f3 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
@@ -22,231 +22,214 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-/**
- * GeoUtils provides utility methods to deal with locations for the data 
streaming exercises.
- */
+/** GeoUtils provides utility methods to deal with locations for the data 
streaming exercises. */
 public class GeoUtils {
 
-       // geo boundaries of the area of NYC
-       public static final double LON_EAST = -73.7;
-       public static final double LON_WEST = -74.05;
-       public static final double LAT_NORTH = 41.0;
-       public static final double LAT_SOUTH = 40.5;
-
-       // area width and height
-       public static final double LON_WIDTH = 74.05 - 73.7;
-       public static final double LAT_HEIGHT = 41.0 - 40.5;
-
-       // delta step to create artificial grid overlay of NYC
-       public static final double DELTA_LON = 0.0014;
-       public static final double DELTA_LAT = 0.00125;
-
-       // ( |LON_WEST| - |LON_EAST| ) / DELTA_LAT
-       public static final int NUMBER_OF_GRID_X = 250;
-       // ( LAT_NORTH - LAT_SOUTH ) / DELTA_LON
-       public static final int NUMBER_OF_GRID_Y = 400;
-
-       public static final float DEG_LEN = 110.25f;
-
-       /**
-        * Checks if a location specified by longitude and latitude values is
-        * within the geo boundaries of New York City.
-        *
-        * @param lon longitude of the location to check
-        * @param lat latitude of the location to check
-        *
-        * @return true if the location is within NYC boundaries, otherwise 
false.
-        */
-       public static boolean isInNYC(float lon, float lat) {
-
-               return !(lon > LON_EAST || lon < LON_WEST) &&
-                               !(lat > LAT_NORTH || lat < LAT_SOUTH);
-       }
-
-       /**
-        * Maps a location specified by latitude and longitude values to a cell 
of a
-        * grid covering the area of NYC.
-        * The grid cells are roughly 100 x 100 m and sequentially number from 
north-west
-        * to south-east starting by zero.
-        *
-        * @param lon longitude of the location to map
-        * @param lat latitude of the location to map
-        *
-        * @return id of mapped grid cell.
-        */
-       public static int mapToGridCell(float lon, float lat) {
-               int xIndex = (int) Math.floor((Math.abs(LON_WEST) - 
Math.abs(lon)) / DELTA_LON);
-               int yIndex = (int) Math.floor((LAT_NORTH - lat) / DELTA_LAT);
-
-               return xIndex + (yIndex * NUMBER_OF_GRID_X);
-       }
-
-       /**
-        * Maps the direct path between two locations specified by longitude 
and latitude to a list of
-        * cells of a grid covering the area of NYC.
-        * The grid cells are roughly 100 x 100 m and sequentially number from 
north-west
-        * to south-east starting by zero.
-        *
-        * @param lon1 longitude of the first location
-        * @param lat1 latitude of the first location
-        * @param lon2 longitude of the second location
-        * @param lat2 latitude of the second location
-        *
-        * @return A list of cell ids
-        */
-       public static List<Integer> mapToGridCellsOnWay(float lon1, float lat1, 
float lon2, float lat2) {
-
-               int x1 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon1)) 
/ DELTA_LON);
-               int y1 = (int) Math.floor((LAT_NORTH - lat1) / DELTA_LAT);
-
-               int x2 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon2)) 
/ DELTA_LON);
-               int y2 = (int) Math.floor((LAT_NORTH - lat2) / DELTA_LAT);
-
-               int startX, startY, endX, endY;
-               if (x1 <= x2) {
-                       startX = x1;
-                       startY = y1;
-                       endX = x2;
-                       endY = y2;
-               }
-               else {
-                       startX = x2;
-                       startY = y2;
-                       endX = x1;
-                       endY = y1;
-               }
-
-               double slope = (endY - startY) / ((endX - startX) + 0.00000001);
-
-               int curX = startX;
-               int curY = startY;
-
-               ArrayList<Integer> cellIds = new ArrayList<>(64);
-               cellIds.add(curX + (curY * NUMBER_OF_GRID_X));
-
-               while (curX < endX || curY != endY) {
-
-                       if (slope > 0) {
-                               double y = (curX - startX + 0.5) * slope + 
startY - 0.5;
-
-                               if (y > curY - 0.05 && y < curY + 0.05) {
-                                       curX++;
-                                       curY++;
-                               }
-                               else if (y < curY) {
-                                       curX++;
-                               }
-                               else {
-                                       curY++;
-                               }
-                       }
-                       else {
-                               double y = (curX - startX + 0.5) * slope + 
startY + 0.5;
-
-                               if (y > curY - 0.05 && y < curY + 0.05) {
-                                       curX++;
-                                       curY--;
-                               }
-                               if (y > curY) {
-                                       curX++;
-                               }
-                               else {
-                                       curY--;
-                               }
-
-                       }
-
-                       cellIds.add(curX + (curY * NUMBER_OF_GRID_X));
-               }
-
-               return cellIds;
-       }
-
-       /**
-        * Returns the longitude of the center of a grid cell.
-        *
-        * @param gridCellId The grid cell.
-        *
-        * @return The longitude value of the cell's center.
-        */
-       public static float getGridCellCenterLon(int gridCellId) {
-
-               int xIndex = gridCellId % NUMBER_OF_GRID_X;
-
-               return (float) (Math.abs(LON_WEST) - (xIndex * DELTA_LON) - 
(DELTA_LON / 2)) * -1.0f;
-       }
-
-       /**
-        * Returns the latitude of the center of a grid cell.
-        *
-        * @param gridCellId The grid cell.
-        *
-        * @return The latitude value of the cell's center.
-        */
-       public static float getGridCellCenterLat(int gridCellId) {
-
-               int xIndex = gridCellId % NUMBER_OF_GRID_X;
-               int yIndex = (gridCellId - xIndex) / NUMBER_OF_GRID_X;
-
-               return (float) (LAT_NORTH - (yIndex * DELTA_LAT) - (DELTA_LAT / 
2));
-
-       }
-
-       /**
-        * Returns a random longitude within the NYC area.
-        *
-        * @param rand A random number generator.
-        * @return A random longitude value within the NYC area.
-        */
-       public static float getRandomNYCLon(Random rand) {
-               return (float) (LON_EAST - (LON_WIDTH * rand.nextFloat()));
-       }
-
-       /**
-        * Returns a random latitude within the NYC area.
-        *
-        * @param rand A random number generator.
-        * @return A random latitude value within the NYC area.
-        */
-       public static float getRandomNYCLat(Random rand) {
-               return (float) (LAT_SOUTH + (LAT_HEIGHT * rand.nextFloat()));
-       }
-
-       /**
-        * Returns the Euclidean distance between two locations specified as 
lon/lat pairs.
-        *
-        * @param lon1 Longitude of first location
-        * @param lat1 Latitude of first location
-        * @param lon2 Longitude of second location
-        * @param lat2 Latitude of second location
-        * @return The Euclidean distance between the specified locations.
-        */
-       public static double getEuclideanDistance(float lon1, float lat1, float 
lon2, float lat2) {
-               double x = lat1 - lat2;
-               double y = (lon1 - lon2) * Math.cos(lat2);
-               return (DEG_LEN * Math.sqrt(x * x + y * y));
-       }
-
-       /**
-        * Returns the angle in degrees between the vector from the start to 
the destination
-        * and the x-axis on which the start is located.
-        *
-        * <p>The angle describes in which direction the destination is located 
from the start, i.e.,
-        * 0° -> East, 90° -> South, 180° -> West, 270° -> North
-        *
-        * @param startLon longitude of start location
-        * @param startLat latitude of start location
-        * @param destLon longitude of destination
-        * @param destLat latitude of destination
-        * @return The direction from start to destination location
-        */
-       public static int getDirectionAngle(
-                       float startLon, float startLat, float destLon, float 
destLat) {
-
-               double x = destLat - startLat;
-               double y = (destLon - startLon) * Math.cos(startLat);
-
-               return (int) Math.toDegrees(Math.atan2(x, y)) + 179;
-       }
-
+    // geo boundaries of the area of NYC
+    public static final double LON_EAST = -73.7;
+    public static final double LON_WEST = -74.05;
+    public static final double LAT_NORTH = 41.0;
+    public static final double LAT_SOUTH = 40.5;
+
+    // area width and height
+    public static final double LON_WIDTH = 74.05 - 73.7;
+    public static final double LAT_HEIGHT = 41.0 - 40.5;
+
+    // delta step to create artificial grid overlay of NYC
+    public static final double DELTA_LON = 0.0014;
+    public static final double DELTA_LAT = 0.00125;
+
+    // ( |LON_WEST| - |LON_EAST| ) / DELTA_LAT
+    public static final int NUMBER_OF_GRID_X = 250;
+    // ( LAT_NORTH - LAT_SOUTH ) / DELTA_LON
+    public static final int NUMBER_OF_GRID_Y = 400;
+
+    public static final float DEG_LEN = 110.25f;
+
+    /**
+     * Checks if a location specified by longitude and latitude values is 
within the geo boundaries
+     * of New York City.
+     *
+     * @param lon longitude of the location to check
+     * @param lat latitude of the location to check
+     * @return true if the location is within NYC boundaries, otherwise false.
+     */
+    public static boolean isInNYC(float lon, float lat) {
+
+        return !(lon > LON_EAST || lon < LON_WEST) && !(lat > LAT_NORTH || lat 
< LAT_SOUTH);
+    }
+
+    /**
+     * Maps a location specified by latitude and longitude values to a cell of 
a grid covering the
+     * area of NYC. The grid cells are roughly 100 x 100 m and sequentially 
number from north-west
+     * to south-east starting by zero.
+     *
+     * @param lon longitude of the location to map
+     * @param lat latitude of the location to map
+     * @return id of mapped grid cell.
+     */
+    public static int mapToGridCell(float lon, float lat) {
+        int xIndex = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon)) / 
DELTA_LON);
+        int yIndex = (int) Math.floor((LAT_NORTH - lat) / DELTA_LAT);
+
+        return xIndex + (yIndex * NUMBER_OF_GRID_X);
+    }
+
+    /**
+     * Maps the direct path between two locations specified by longitude and 
latitude to a list of
+     * cells of a grid covering the area of NYC. The grid cells are roughly 
100 x 100 m and
+     * sequentially number from north-west to south-east starting by zero.
+     *
+     * @param lon1 longitude of the first location
+     * @param lat1 latitude of the first location
+     * @param lon2 longitude of the second location
+     * @param lat2 latitude of the second location
+     * @return A list of cell ids
+     */
+    public static List<Integer> mapToGridCellsOnWay(
+            float lon1, float lat1, float lon2, float lat2) {
+
+        int x1 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon1)) / 
DELTA_LON);
+        int y1 = (int) Math.floor((LAT_NORTH - lat1) / DELTA_LAT);
+
+        int x2 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon2)) / 
DELTA_LON);
+        int y2 = (int) Math.floor((LAT_NORTH - lat2) / DELTA_LAT);
+
+        int startX, startY, endX, endY;
+        if (x1 <= x2) {
+            startX = x1;
+            startY = y1;
+            endX = x2;
+            endY = y2;
+        } else {
+            startX = x2;
+            startY = y2;
+            endX = x1;
+            endY = y1;
+        }
+
+        double slope = (endY - startY) / ((endX - startX) + 0.00000001);
+
+        int curX = startX;
+        int curY = startY;
+
+        ArrayList<Integer> cellIds = new ArrayList<>(64);
+        cellIds.add(curX + (curY * NUMBER_OF_GRID_X));
+
+        while (curX < endX || curY != endY) {
+
+            if (slope > 0) {
+                double y = (curX - startX + 0.5) * slope + startY - 0.5;
+
+                if (y > curY - 0.05 && y < curY + 0.05) {
+                    curX++;
+                    curY++;
+                } else if (y < curY) {
+                    curX++;
+                } else {
+                    curY++;
+                }
+            } else {
+                double y = (curX - startX + 0.5) * slope + startY + 0.5;
+
+                if (y > curY - 0.05 && y < curY + 0.05) {
+                    curX++;
+                    curY--;
+                }
+                if (y > curY) {
+                    curX++;
+                } else {
+                    curY--;
+                }
+            }
+
+            cellIds.add(curX + (curY * NUMBER_OF_GRID_X));
+        }
+
+        return cellIds;
+    }
+
+    /**
+     * Returns the longitude of the center of a grid cell.
+     *
+     * @param gridCellId The grid cell.
+     * @return The longitude value of the cell's center.
+     */
+    public static float getGridCellCenterLon(int gridCellId) {
+
+        int xIndex = gridCellId % NUMBER_OF_GRID_X;
+
+        return (float) (Math.abs(LON_WEST) - (xIndex * DELTA_LON) - (DELTA_LON 
/ 2)) * -1.0f;
+    }
+
+    /**
+     * Returns the latitude of the center of a grid cell.
+     *
+     * @param gridCellId The grid cell.
+     * @return The latitude value of the cell's center.
+     */
+    public static float getGridCellCenterLat(int gridCellId) {
+
+        int xIndex = gridCellId % NUMBER_OF_GRID_X;
+        int yIndex = (gridCellId - xIndex) / NUMBER_OF_GRID_X;
+
+        return (float) (LAT_NORTH - (yIndex * DELTA_LAT) - (DELTA_LAT / 2));
+    }
+
+    /**
+     * Returns a random longitude within the NYC area.
+     *
+     * @param rand A random number generator.
+     * @return A random longitude value within the NYC area.
+     */
+    public static float getRandomNYCLon(Random rand) {
+        return (float) (LON_EAST - (LON_WIDTH * rand.nextFloat()));
+    }
+
+    /**
+     * Returns a random latitude within the NYC area.
+     *
+     * @param rand A random number generator.
+     * @return A random latitude value within the NYC area.
+     */
+    public static float getRandomNYCLat(Random rand) {
+        return (float) (LAT_SOUTH + (LAT_HEIGHT * rand.nextFloat()));
+    }
+
+    /**
+     * Returns the Euclidean distance between two locations specified as 
lon/lat pairs.
+     *
+     * @param lon1 Longitude of first location
+     * @param lat1 Latitude of first location
+     * @param lon2 Longitude of second location
+     * @param lat2 Latitude of second location
+     * @return The Euclidean distance between the specified locations.
+     */
+    public static double getEuclideanDistance(float lon1, float lat1, float 
lon2, float lat2) {
+        double x = lat1 - lat2;
+        double y = (lon1 - lon2) * Math.cos(lat2);
+        return (DEG_LEN * Math.sqrt(x * x + y * y));
+    }
+
+    /**
+     * Returns the angle in degrees between the vector from the start to the 
destination and the
+     * x-axis on which the start is located.
+     *
+     * <p>The angle describes in which direction the destination is located 
from the start, i.e., 0°
+     * -> East, 90° -> South, 180° -> West, 270° -> North
+     *
+     * @param startLon longitude of start location
+     * @param startLat latitude of start location
+     * @param destLon longitude of destination
+     * @param destLat latitude of destination
+     * @return The direction from start to destination location
+     */
+    public static int getDirectionAngle(
+            float startLon, float startLat, float destLon, float destLat) {
+
+        double x = destLat - startLat;
+        double y = (destLon - startLon) * Math.cos(startLat);
+
+        return (int) Math.toDegrees(Math.atan2(x, y)) + 179;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java
index 4f6c95f..8585ae0 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.training.exercises.common.utils;
 
-/**
- * Exception denoting a missing solution (results in tests verifying the 
solution instead).
- */
+/** Exception denoting a missing solution (results in tests verifying the 
solution instead). */
 public class MissingSolutionException extends Exception {
-       /**
-        * Create new exception.
-        */
-       public MissingSolutionException() {}
+    /** Create new exception. */
+    public MissingSolutionException() {}
 }
diff --git 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
index f4f5596..0cadcce 100644
--- 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
+++ 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
@@ -30,139 +30,160 @@ import java.util.ArrayList;
 import java.util.List;
 
 public abstract class TaxiRideTestBase<OUT> {
-       public static class TestRideSource extends TestSource<TaxiRide> 
implements ResultTypeQueryable<TaxiRide> {
-               public TestRideSource(Object ... eventsOrWatermarks) {
-                       this.testStream = eventsOrWatermarks;
-               }
-
-               @Override
-               long getTimestamp(TaxiRide ride) {
-                       return ride.getEventTime();
-               }
-
-               @Override
-               public TypeInformation<TaxiRide> getProducedType() {
-                       return TypeInformation.of(TaxiRide.class);
-               }
-       }
-
-       public static class TestFareSource extends TestSource<TaxiFare> 
implements ResultTypeQueryable<TaxiFare> {
-               public TestFareSource(Object ... eventsOrWatermarks) {
-                       this.testStream = eventsOrWatermarks;
-               }
-
-               @Override
-               long getTimestamp(TaxiFare fare) {
-                       return fare.getEventTime();
-               }
-
-               @Override
-               public TypeInformation<TaxiFare> getProducedType() {
-                       return TypeInformation.of(TaxiFare.class);
-               }
-       }
-
-       public static class TestStringSource extends TestSource<String> 
implements ResultTypeQueryable<String> {
-               public TestStringSource(Object ... eventsOrWatermarks) {
-                       this.testStream = eventsOrWatermarks;
-               }
-
-               @Override
-               long getTimestamp(String s) {
-                       return 0L;
-               }
-
-               @Override
-               public TypeInformation<String> getProducedType() {
-                       return TypeInformation.of(String.class);
-               }
-       }
-
-       public static class TestSink<OUT> implements SinkFunction<OUT> {
-
-               // must be static
-               public static final List VALUES = new ArrayList<>();
-
-               @Override
-               public void invoke(OUT value, Context context) {
-                       VALUES.add(value);
-               }
-       }
-
-       public interface Testable {
-               void main() throws Exception;
-       }
-
-       protected List<OUT> runApp(TestRideSource source, TestSink<OUT> sink, 
Testable exercise, Testable solution) throws Exception {
-               ExerciseBase.rides = source;
-
-               return execute(sink, exercise, solution);
-       }
-
-       protected List<OUT> runApp(TestFareSource source, TestSink<OUT> sink, 
Testable exercise, Testable solution) throws Exception {
-               ExerciseBase.fares = source;
-
-               return execute(sink, exercise, solution);
-       }
-
-       protected List<OUT> runApp(TestRideSource rides, TestFareSource fares, 
TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception {
-               ExerciseBase.rides = rides;
-               ExerciseBase.fares = fares;
-
-               return execute(sink, exercise, solution);
-       }
-
-       protected List<OUT> runApp(TestRideSource rides, TestSink<OUT> sink, 
Testable solution) throws Exception {
-               ExerciseBase.rides = rides;
-
-               return execute(sink, solution);
-       }
-
-       protected List<OUT> runApp(TestRideSource rides, TestStringSource 
strings, TestSink<OUT> sink, Testable exercise, Testable solution) throws 
Exception {
-               ExerciseBase.rides = rides;
-               ExerciseBase.strings = strings;
-
-               return execute(sink, exercise, solution);
-       }
-
-       private List<OUT> execute(TestSink<OUT> sink, Testable exercise, 
Testable solution) throws Exception {
-               sink.VALUES.clear();
-
-               ExerciseBase.out = sink;
-               ExerciseBase.parallelism = 1;
-
-               try {
-                       exercise.main();
-               } catch (Exception e) {
-                       if (ultimateCauseIsMissingSolution(e)) {
-                               sink.VALUES.clear();
-                               solution.main();
-                       } else {
-                               throw e;
-                       }
-               }
-
-               return sink.VALUES;
-       }
-
-       private List<OUT> execute(TestSink<OUT> sink, Testable solution) throws 
Exception {
-               sink.VALUES.clear();
-
-               ExerciseBase.out = sink;
-               ExerciseBase.parallelism = 1;
-
-               solution.main();
-
-               return sink.VALUES;
-       }
-
-       private boolean ultimateCauseIsMissingSolution(Throwable e) {
-               if (e instanceof MissingSolutionException) {
-                       return true;
-               } else if (e.getCause() != null) {
-                       return ultimateCauseIsMissingSolution(e.getCause());
-               } else {
-                       return false;
-               }
-       }
+    public static class TestRideSource extends TestSource<TaxiRide>
+            implements ResultTypeQueryable<TaxiRide> {
+        public TestRideSource(Object... eventsOrWatermarks) {
+            this.testStream = eventsOrWatermarks;
+        }
+
+        @Override
+        long getTimestamp(TaxiRide ride) {
+            return ride.getEventTime();
+        }
+
+        @Override
+        public TypeInformation<TaxiRide> getProducedType() {
+            return TypeInformation.of(TaxiRide.class);
+        }
+    }
+
+    public static class TestFareSource extends TestSource<TaxiFare>
+            implements ResultTypeQueryable<TaxiFare> {
+        public TestFareSource(Object... eventsOrWatermarks) {
+            this.testStream = eventsOrWatermarks;
+        }
+
+        @Override
+        long getTimestamp(TaxiFare fare) {
+            return fare.getEventTime();
+        }
+
+        @Override
+        public TypeInformation<TaxiFare> getProducedType() {
+            return TypeInformation.of(TaxiFare.class);
+        }
+    }
+
+    public static class TestStringSource extends TestSource<String>
+            implements ResultTypeQueryable<String> {
+        public TestStringSource(Object... eventsOrWatermarks) {
+            this.testStream = eventsOrWatermarks;
+        }
+
+        @Override
+        long getTimestamp(String s) {
+            return 0L;
+        }
+
+        @Override
+        public TypeInformation<String> getProducedType() {
+            return TypeInformation.of(String.class);
+        }
+    }
+
+    public static class TestSink<OUT> implements SinkFunction<OUT> {
+
+        // must be static
+        public static final List VALUES = new ArrayList<>();
+
+        @Override
+        public void invoke(OUT value, Context context) {
+            VALUES.add(value);
+        }
+    }
+
+    public interface Testable {
+        void main() throws Exception;
+    }
+
+    protected List<OUT> runApp(
+            TestRideSource source, TestSink<OUT> sink, Testable exercise, 
Testable solution)
+            throws Exception {
+        ExerciseBase.rides = source;
+
+        return execute(sink, exercise, solution);
+    }
+
+    protected List<OUT> runApp(
+            TestFareSource source, TestSink<OUT> sink, Testable exercise, 
Testable solution)
+            throws Exception {
+        ExerciseBase.fares = source;
+
+        return execute(sink, exercise, solution);
+    }
+
+    protected List<OUT> runApp(
+            TestRideSource rides,
+            TestFareSource fares,
+            TestSink<OUT> sink,
+            Testable exercise,
+            Testable solution)
+            throws Exception {
+        ExerciseBase.rides = rides;
+        ExerciseBase.fares = fares;
+
+        return execute(sink, exercise, solution);
+    }
+
+    protected List<OUT> runApp(TestRideSource rides, TestSink<OUT> sink, 
Testable solution)
+            throws Exception {
+        ExerciseBase.rides = rides;
+
+        return execute(sink, solution);
+    }
+
+    protected List<OUT> runApp(
+            TestRideSource rides,
+            TestStringSource strings,
+            TestSink<OUT> sink,
+            Testable exercise,
+            Testable solution)
+            throws Exception {
+        ExerciseBase.rides = rides;
+        ExerciseBase.strings = strings;
+
+        return execute(sink, exercise, solution);
+    }
+
+    private List<OUT> execute(TestSink<OUT> sink, Testable exercise, Testable 
solution)
+            throws Exception {
+        sink.VALUES.clear();
+
+        ExerciseBase.out = sink;
+        ExerciseBase.parallelism = 1;
+
+        try {
+            exercise.main();
+        } catch (Exception e) {
+            if (ultimateCauseIsMissingSolution(e)) {
+                sink.VALUES.clear();
+                solution.main();
+            } else {
+                throw e;
+            }
+        }
+
+        return sink.VALUES;
+    }
+
+    private List<OUT> execute(TestSink<OUT> sink, Testable solution) throws 
Exception {
+        sink.VALUES.clear();
+
+        ExerciseBase.out = sink;
+        ExerciseBase.parallelism = 1;
+
+        solution.main();
+
+        return sink.VALUES;
+    }
+
+    private boolean ultimateCauseIsMissingSolution(Throwable e) {
+        if (e instanceof MissingSolutionException) {
+            return true;
+        } else if (e.getCause() != null) {
+            return ultimateCauseIsMissingSolution(e.getCause());
+        } else {
+            return false;
+        }
+    }
 }
diff --git 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
index 84ba53e..593112e 100644
--- 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
+++ 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
@@ -22,29 +22,29 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
 public abstract class TestSource<T> implements SourceFunction<T> {
-       private volatile boolean running = true;
-       // T or watermark (Long)
-       protected Object[] testStream;
+    private volatile boolean running = true;
+    // T or watermark (Long)
+    protected Object[] testStream;
 
-       @Override
-       public void run(SourceContext<T> ctx) {
-               for (int i = 0; (i < testStream.length) && running; i++) {
-                       if (testStream[i] instanceof Long) {
-                               Long ts = (Long) testStream[i];
-                               ctx.emitWatermark(new Watermark(ts));
-                       } else {
-                               //noinspection unchecked
-                               T element = (T) testStream[i];
-                               ctx.collectWithTimestamp(element, 
getTimestamp(element));
-                       }
-               }
-               // test sources are finite, so they have a Long.MAX_VALUE 
watermark when they finishes
-       }
+    @Override
+    public void run(SourceContext<T> ctx) {
+        for (int i = 0; (i < testStream.length) && running; i++) {
+            if (testStream[i] instanceof Long) {
+                Long ts = (Long) testStream[i];
+                ctx.emitWatermark(new Watermark(ts));
+            } else {
+                //noinspection unchecked
+                T element = (T) testStream[i];
+                ctx.collectWithTimestamp(element, getTimestamp(element));
+            }
+        }
+        // test sources are finite, so they have a Long.MAX_VALUE watermark 
when they finishes
+    }
 
-       abstract long getTimestamp(T element);
+    abstract long getTimestamp(T element);
 
-       @Override
-       public void cancel() {
-               running = false;
-       }
+    @Override
+    public void cancel() {
+        running = false;
+    }
 }
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index 888a015..56a55d4 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -25,28 +25,28 @@ The Java and Scala reference solutions illustrate two 
different approaches, thou
 
 ```java
 DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
-       .keyBy((TaxiFare fare) -> fare.driverId)
-       .window(TumblingEventTimeWindows.of(Time.hours(1)))
-       .process(new AddTips());
+    .keyBy((TaxiFare fare) -> fare.driverId)
+    .window(TumblingEventTimeWindows.of(Time.hours(1)))
+    .process(new AddTips());
 ```
 
 where a `ProcessWindowFunction` does all the heavy lifting:
 
 ```java
 public static class AddTips extends ProcessWindowFunction<
-               TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
-       @Override
-       public void process(Long key, Context context, Iterable<TaxiFare> 
fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
-               Float sumOfTips = 0F;
-               for (TaxiFare f : fares) {
-                       sumOfTips += f.tip;
-               }
-               out.collect(Tuple3.of(context.window().getEnd(), key, 
sumOfTips));
-       }
+        TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
+    @Override
+    public void process(Long key, Context context, Iterable<TaxiFare> fares, 
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
+        Float sumOfTips = 0F;
+        for (TaxiFare f : fares) {
+            sumOfTips += f.tip;
+        }
+        out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
+    }
 }
 ```
 
-This is straightforward, but has the drawback that it is buffering all of the 
`TaxiFare` objects in the windows until the windows are triggered, which is 
less efficient than computing the sum of the tips incrementally, using a 
`reduce` or `agggregate` function. 
+This is straightforward, but has the drawback that it is buffering all of the 
`TaxiFare` objects in the windows until the windows are triggered, which is 
less efficient than computing the sum of the tips incrementally, using a 
`reduce` or `agggregate` function.
 
 The [Scala 
solution](src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala)
 uses a `reduce` function
 
@@ -95,8 +95,8 @@ Now, how to find the maximum within each hour? The reference 
solutions both do t
 
 ```java
 DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-       .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
-       .maxBy(2);
+    .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
+    .maxBy(2);
 ```
 
 which works just fine, producing this stream of results:
@@ -114,8 +114,8 @@ But, what if we were to do this, instead?
 
 ```java
 DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-       .keyBy(t -> t.f0)
-       .maxBy(2);
+    .keyBy(t -> t.f0)
+    .maxBy(2);
 ```
 
 This says to group the stream of `hourlyTips` by timestamp, and within each 
timestamp, find the maximum of the sum of the tips.
diff --git 
a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
 
b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
index b453c65..fddbc80 100644
--- 
a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
+++ 
b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
@@ -28,32 +28,30 @@ import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 /**
  * The "Hourly Tips" exercise of the Flink training in the docs.
  *
- * <p>The task of the exercise is to first calculate the total tips collected 
by each driver, hour by hour, and
- * then from that stream, find the highest tip total in each hour.
- *
+ * <p>The task of the exercise is to first calculate the total tips collected 
by each driver, hour
+ * by hour, and then from that stream, find the highest tip total in each hour.
  */
 public class HourlyTipsExercise extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
-
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // start the data generator
-               DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               throw new MissingSolutionException();
+        // start the data generator
+        DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
 
-//             printOrTest(hourlyMax);
+        throw new MissingSolutionException();
 
-               // execute the transformation pipeline
-//             env.execute("Hourly Tips (java)");
-       }
+        //             printOrTest(hourlyMax);
 
+        // execute the transformation pipeline
+        //             env.execute("Hourly Tips (java)");
+    }
 }
diff --git 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index a6c6279..9c9ee87 100644
--- 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -33,62 +33,64 @@ import org.apache.flink.util.Collector;
 /**
  * Java reference implementation for the "Hourly Tips" exercise of the Flink 
training in the docs.
  *
- * <p>The task of the exercise is to first calculate the total tips collected 
by each driver, hour by hour, and
- * then from that stream, find the highest tip total in each hour.
- *
+ * <p>The task of the exercise is to first calculate the total tips collected 
by each driver, hour
+ * by hour, and then from that stream, find the highest tip total in each hour.
  */
 public class HourlyTipsSolution extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               // start the data generator
-               DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
+        // start the data generator
+        DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
 
-               // compute tips per hour for each driver
-               DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
-                               .keyBy((TaxiFare fare) -> fare.driverId)
-                               
.window(TumblingEventTimeWindows.of(Time.hours(1)))
-                               .process(new AddTips());
+        // compute tips per hour for each driver
+        DataStream<Tuple3<Long, Long, Float>> hourlyTips =
+                fares.keyBy((TaxiFare fare) -> fare.driverId)
+                        .window(TumblingEventTimeWindows.of(Time.hours(1)))
+                        .process(new AddTips());
 
-               DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-                               
.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
-                               .maxBy(2);
+        DataStream<Tuple3<Long, Long, Float>> hourlyMax =
+                
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);
 
-//             You should explore how this alternative behaves. In what ways 
is the same as,
-//             and different from, the solution above (using a windowAll)?
+        //             You should explore how this alternative behaves. In 
what ways is the same as,
+        //             and different from, the solution above (using a 
windowAll)?
 
-//             DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
-//                     .keyBy(t -> t.f0)
-//                     .maxBy(2);
+        //             DataStream<Tuple3<Long, Long, Float>> hourlyMax = 
hourlyTips
+        //                     .keyBy(t -> t.f0)
+        //                     .maxBy(2);
 
-               printOrTest(hourlyMax);
+        printOrTest(hourlyMax);
 
-               // execute the transformation pipeline
-               env.execute("Hourly Tips (java)");
-       }
+        // execute the transformation pipeline
+        env.execute("Hourly Tips (java)");
+    }
 
-       /*
-        * Wraps the pre-aggregated result into a tuple along with the window's 
timestamp and key.
-        */
-       public static class AddTips extends ProcessWindowFunction<
-                       TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
+    /*
+     * Wraps the pre-aggregated result into a tuple along with the window's 
timestamp and key.
+     */
+    public static class AddTips
+            extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, 
Long, TimeWindow> {
 
-               @Override
-               public void process(Long key, Context context, 
Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) {
-                       float sumOfTips = 0F;
-                       for (TaxiFare f : fares) {
-                               sumOfTips += f.tip;
-                       }
-                       out.collect(Tuple3.of(context.window().getEnd(), key, 
sumOfTips));
-               }
-       }
+        @Override
+        public void process(
+                Long key,
+                Context context,
+                Iterable<TaxiFare> fares,
+                Collector<Tuple3<Long, Long, Float>> out) {
+            float sumOfTips = 0F;
+            for (TaxiFare f : fares) {
+                sumOfTips += f.tip;
+            }
+            out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
+        }
+    }
 }
diff --git 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 2924a21..24262a6 100644
--- 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -34,76 +34,65 @@ import static org.junit.Assert.assertEquals;
 
 public class HourlyTipsTest extends TaxiRideTestBase<Tuple3<Long, Long, 
Float>> {
 
-       static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new 
String[]{});
+    static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new 
String[] {});
 
-       @Test
-       public void testOneDriverOneTip() throws Exception {
-               TaxiFare one = testFare(1, t(0), 1.0F);
+    @Test
+    public void testOneDriverOneTip() throws Exception {
+        TaxiFare one = testFare(1, t(0), 1.0F);
 
-               TestFareSource source = new TestFareSource(
-                               one
-               );
+        TestFareSource source = new TestFareSource(one);
 
-               Tuple3<Long, Long, Float> max = Tuple3.of(t(60).toEpochMilli(), 
1L, 1.0F);
+        Tuple3<Long, Long, Float> max = Tuple3.of(t(60).toEpochMilli(), 1L, 
1.0F);
 
-               List<Tuple3<Long, Long, Float>> expected = 
Collections.singletonList(max);
+        List<Tuple3<Long, Long, Float>> expected = 
Collections.singletonList(max);
 
-               assertEquals(expected, results(source));
-       }
+        assertEquals(expected, results(source));
+    }
 
-       @Test
-       public void testTipsAreSummedByHour() throws Exception {
-               TaxiFare oneIn1 = testFare(1, t(0), 1.0F);
-               TaxiFare fiveIn1 = testFare(1, t(15), 5.0F);
-               TaxiFare tenIn2 = testFare(1, t(90), 10.0F);
+    @Test
+    public void testTipsAreSummedByHour() throws Exception {
+        TaxiFare oneIn1 = testFare(1, t(0), 1.0F);
+        TaxiFare fiveIn1 = testFare(1, t(15), 5.0F);
+        TaxiFare tenIn2 = testFare(1, t(90), 10.0F);
 
-               TestFareSource source = new TestFareSource(
-                               oneIn1,
-                               fiveIn1,
-                               tenIn2
-               );
+        TestFareSource source = new TestFareSource(oneIn1, fiveIn1, tenIn2);
 
-               Tuple3<Long, Long, Float> hour1 = 
Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F);
-               Tuple3<Long, Long, Float> hour2 = 
Tuple3.of(t(120).toEpochMilli(), 1L, 10.0F);
+        Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 
6.0F);
+        Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 1L, 
10.0F);
 
-               List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, 
hour2);
+        List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
-               assertEquals(expected, results(source));
-       }
+        assertEquals(expected, results(source));
+    }
 
-       @Test
-       public void testMaxAcrossDrivers() throws Exception {
-               TaxiFare oneFor1In1 = testFare(1, t(0), 1.0F);
-               TaxiFare fiveFor1In1 = testFare(1, t(15), 5.0F);
-               TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);
-               TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F);
+    @Test
+    public void testMaxAcrossDrivers() throws Exception {
+        TaxiFare oneFor1In1 = testFare(1, t(0), 1.0F);
+        TaxiFare fiveFor1In1 = testFare(1, t(15), 5.0F);
+        TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);
+        TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F);
 
-               TestFareSource source = new TestFareSource(
-                               oneFor1In1,
-                               fiveFor1In1,
-                               tenFor1In2,
-                               twentyFor2In2
-               );
+        TestFareSource source =
+                new TestFareSource(oneFor1In1, fiveFor1In1, tenFor1In2, 
twentyFor2In2);
 
-               Tuple3<Long, Long, Float> hour1 = 
Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F);
-               Tuple3<Long, Long, Float> hour2 = 
Tuple3.of(t(120).toEpochMilli(), 2L, 20.0F);
+        Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 
6.0F);
+        Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L, 
20.0F);
 
-               List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, 
hour2);
+        List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
-               assertEquals(expected, results(source));
-       }
+        assertEquals(expected, results(source));
+    }
 
-       private Instant t(int minutes) {
-               return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 
* minutes);
-       }
+    private Instant t(int minutes) {
+        return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 * 
minutes);
+    }
 
-       private TaxiFare testFare(long driverId, Instant startTime, float tip) {
-               return new TaxiFare(0, 0, driverId, startTime, "", tip, 0F, 0F);
-       }
-
-       protected List<Tuple3<Long, Long, Float>> results(TestFareSource 
source) throws Exception {
-               Testable javaSolution = () -> HourlyTipsSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
-       }
+    private TaxiFare testFare(long driverId, Instant startTime, float tip) {
+        return new TaxiFare(0, 0, driverId, startTime, "", tip, 0F, 0F);
+    }
 
+    protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) 
throws Exception {
+        Testable javaSolution = () -> HourlyTipsSolution.main(new String[] {});
+        return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
+    }
 }
diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md
index f931ffa..571094b 100644
--- a/long-ride-alerts/DISCUSSION.md
+++ b/long-ride-alerts/DISCUSSION.md
@@ -25,7 +25,7 @@ It would be interesting to test that the solution does not 
leak state.
 
 A good way to write unit tests for a `KeyedProcessFunction` to check for state 
retention, etc., is to
 use the test harnesses described in the
-[documentation on 
testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators).
 
+[documentation on 
testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators).
 
 In fact, the reference solutions will leak state in the case where a START 
event is missing. They also
 leak in the case where the alert is generated, but then the END event does 
eventually arrive (after `onTimer()`
diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md
index a5d2ac7..3366fbb 100644
--- a/long-ride-alerts/README.md
+++ b/long-ride-alerts/README.md
@@ -22,7 +22,7 @@ under the License.
 The goal of the "Long Ride Alerts" exercise is to provide a real-time warning 
whenever a taxi ride
 started two hours ago, and is still ongoing.
 
-This should be done using the event time timestamps and watermarks that are 
provided in the data stream. 
+This should be done using the event time timestamps and watermarks that are 
provided in the data stream.
 
 The stream is out-of-order, and it is possible that the END event for a ride 
will be processed before
 its START event. But in such cases, we never care to create an alert, since we 
do know that the ride
@@ -62,10 +62,10 @@ The resulting stream should be printed to standard out.
 <details>
 <summary><strong>Overall approach</strong></summary>
 
-This exercise revolves around using a `ProcessFunction` to manage some keyed 
state and event time timers, 
-and doing so in a way that works even when the END event for a given `rideId` 
arrives before the START (which can happen). 
+This exercise revolves around using a `ProcessFunction` to manage some keyed 
state and event time timers,
+and doing so in a way that works even when the END event for a given `rideId` 
arrives before the START (which can happen).
 The challenge is figuring out what state to keep, and when to set and clear 
that state.
-You will want to use event time timers that fire two hours after an incoming 
START event, and in the `onTimer()` method, 
+You will want to use event time timers that fire two hours after an incoming 
START event, and in the `onTimer()` method,
 collect START events to the output only if a matching END event hasn't yet 
arrived.
 </details>
 
diff --git 
a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
 
b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index e42ff37..4fa7c9a 100644
--- 
a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ 
b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -34,47 +34,46 @@ import org.apache.flink.util.Collector;
  *
  * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
  * by an END event during the first 2 hours of the ride.
- *
  */
 public class LongRidesExercise extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               // start the data generator
-               DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        // start the data generator
+        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
 
-               DataStream<TaxiRide> longRides = rides
-                               .keyBy((TaxiRide ride) -> ride.rideId)
-                               .process(new MatchFunction());
+        DataStream<TaxiRide> longRides =
+                rides.keyBy((TaxiRide ride) -> ride.rideId).process(new 
MatchFunction());
 
-               printOrTest(longRides);
+        printOrTest(longRides);
 
-               env.execute("Long Taxi Rides");
-       }
+        env.execute("Long Taxi Rides");
+    }
 
-       public static class MatchFunction extends KeyedProcessFunction<Long, 
TaxiRide, TaxiRide> {
+    public static class MatchFunction extends KeyedProcessFunction<Long, 
TaxiRide, TaxiRide> {
 
-               @Override
-               public void open(Configuration config) throws Exception {
-                       throw new MissingSolutionException();
-               }
+        @Override
+        public void open(Configuration config) throws Exception {
+            throw new MissingSolutionException();
+        }
 
-               @Override
-               public void processElement(TaxiRide ride, Context context, 
Collector<TaxiRide> out) throws Exception {
-                       TimerService timerService = context.timerService();
-               }
+        @Override
+        public void processElement(TaxiRide ride, Context context, 
Collector<TaxiRide> out)
+                throws Exception {
+            TimerService timerService = context.timerService();
+        }
 
-               @Override
-               public void onTimer(long timestamp, OnTimerContext context, 
Collector<TaxiRide> out) throws Exception {
-               }
-       }
+        @Override
+        public void onTimer(long timestamp, OnTimerContext context, 
Collector<TaxiRide> out)
+                throws Exception {}
+    }
 }
diff --git 
a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
 
b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index a846261..5e7536d 100644
--- 
a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ 
b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -34,75 +34,75 @@ import org.apache.flink.util.Collector;
  *
  * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
  * by an END event during the first 2 hours of the ride.
- *
  */
 public class LongRidesSolution extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
-
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
-
-               // start the data generator
-               DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
-
-               DataStream<TaxiRide> longRides = rides
-                               .keyBy((TaxiRide ride) -> ride.rideId)
-                               .process(new MatchFunction());
-
-               printOrTest(longRides);
-
-               env.execute("Long Taxi Rides");
-       }
-
-       private static class MatchFunction extends KeyedProcessFunction<Long, 
TaxiRide, TaxiRide> {
-
-               private ValueState<TaxiRide> rideState;
-
-               @Override
-               public void open(Configuration config) {
-                       ValueStateDescriptor<TaxiRide> stateDescriptor =
-                                       new ValueStateDescriptor<>("ride 
event", TaxiRide.class);
-                       rideState = 
getRuntimeContext().getState(stateDescriptor);
-               }
-
-               @Override
-               public void processElement(TaxiRide ride, Context context, 
Collector<TaxiRide> out) throws Exception {
-                       TaxiRide previousRideEvent = rideState.value();
-
-                       if (previousRideEvent == null) {
-                               rideState.update(ride);
-                               if (ride.isStart) {
-                                       
context.timerService().registerEventTimeTimer(getTimerTime(ride));
-                               }
-                       } else {
-                               if (!ride.isStart) {
-                                       // it's an END event, so event saved 
was the START event and has a timer
-                                       // the timer hasn't fired yet, and we 
can safely kill the timer
-                                       
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
-                               }
-                               // both events have now been seen, we can clear 
the state
-                               rideState.clear();
-                       }
-               }
-
-               @Override
-               public void onTimer(long timestamp, OnTimerContext context, 
Collector<TaxiRide> out) throws Exception {
-
-                       // if we get here, we know that the ride started two 
hours ago, and the END hasn't been processed
-                       out.collect(rideState.value());
-                       rideState.clear();
-               }
-
-               private long getTimerTime(TaxiRide ride) {
-                       return ride.startTime.plusSeconds(120 * 
60).toEpochMilli();
-               }
-       }
-
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
+
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
+
+        // start the data generator
+        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+
+        DataStream<TaxiRide> longRides =
+                rides.keyBy((TaxiRide ride) -> ride.rideId).process(new 
MatchFunction());
+
+        printOrTest(longRides);
+
+        env.execute("Long Taxi Rides");
+    }
+
+    private static class MatchFunction extends KeyedProcessFunction<Long, 
TaxiRide, TaxiRide> {
+
+        private ValueState<TaxiRide> rideState;
+
+        @Override
+        public void open(Configuration config) {
+            ValueStateDescriptor<TaxiRide> stateDescriptor =
+                    new ValueStateDescriptor<>("ride event", TaxiRide.class);
+            rideState = getRuntimeContext().getState(stateDescriptor);
+        }
+
+        @Override
+        public void processElement(TaxiRide ride, Context context, 
Collector<TaxiRide> out)
+                throws Exception {
+            TaxiRide previousRideEvent = rideState.value();
+
+            if (previousRideEvent == null) {
+                rideState.update(ride);
+                if (ride.isStart) {
+                    
context.timerService().registerEventTimeTimer(getTimerTime(ride));
+                }
+            } else {
+                if (!ride.isStart) {
+                    // it's an END event, so event saved was the START event 
and has a timer
+                    // the timer hasn't fired yet, and we can safely kill the 
timer
+                    
context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent));
+                }
+                // both events have now been seen, we can clear the state
+                rideState.clear();
+            }
+        }
+
+        @Override
+        public void onTimer(long timestamp, OnTimerContext context, 
Collector<TaxiRide> out)
+                throws Exception {
+
+            // if we get here, we know that the ride started two hours ago, 
and the END hasn't been
+            // processed
+            out.collect(rideState.value());
+            rideState.clear();
+        }
+
+        private long getTimerTime(TaxiRide ride) {
+            return ride.startTime.plusSeconds(120 * 60).toEpochMilli();
+        }
+    }
 }
diff --git 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index 7d5a24e..d563e90 100644
--- 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -32,87 +32,99 @@ import static org.junit.Assert.assertEquals;
 
 public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
 
-       static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[]{});
-
-       private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
-
-       @Test
-       public void shortRide() throws Exception {
-               Instant oneMinLater = BEGINNING.plusSeconds(60);
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-               Long markOneMinLater = oneMinLater.toEpochMilli();
-
-               TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
-               assert(results(source).isEmpty());
-       }
-
-       @Test
-       public void outOfOrder() throws Exception {
-               Instant oneMinLater = BEGINNING.plusSeconds(60);
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-               Long markOneMinLater = oneMinLater.toEpochMilli();
-
-               TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
-               assert(results(source).isEmpty());
-       }
-
-       @Test
-       public void noStartShort() throws Exception {
-               Instant oneMinLater = BEGINNING.plusSeconds(60);
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-               Long markOneMinLater = oneMinLater.toEpochMilli();
-
-               TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
-               assert(results(source).isEmpty());
-       }
-
-       @Test
-       public void noEnd() throws Exception {
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
-
-               TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
-               assertEquals(Collections.singletonList(rideStarted), 
results(source));
-       }
-
-       @Test
-       public void longRide() throws Exception {
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               Long mark2HoursLater = BEGINNING.plusSeconds(120 * 
60).toEpochMilli();
-               TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusSeconds(180 * 60));
-
-               TestRideSource source = new TestRideSource(rideStarted, 
mark2HoursLater, rideEnded3HoursLater);
-               assertEquals(Collections.singletonList(rideStarted), 
results(source));
-       }
-
-       @Test
-       public void startIsDelayedMoreThanTwoHours() throws Exception {
-               TaxiRide rideStarted = startRide(1, BEGINNING);
-               TaxiRide rideEndedAfter1Hour = endRide(rideStarted, 
BEGINNING.plusSeconds(60 * 60));
-               Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
-
-               TestRideSource source = new TestRideSource(rideEndedAfter1Hour, 
mark2HoursAfterEnd, rideStarted);
-               assert(results(source).isEmpty());
-       }
-
-       private TaxiRide testRide(long rideId, Boolean isStart, Instant 
startTime, Instant endTime) {
-               return new TaxiRide(rideId, isStart, startTime, endTime, 
-73.9947F, 40.750626F, -73.9947F, 40.750626F, (short) 1, 0, 0);
-       }
-
-       private TaxiRide startRide(long rideId, Instant startTime) {
-               return testRide(rideId, true, startTime, Instant.EPOCH);
-       }
-
-       private TaxiRide endRide(TaxiRide started, Instant endTime) {
-               return testRide(started.rideId, false, started.startTime, 
endTime);
-       }
-
-       protected List<TaxiRide> results(TestRideSource source) throws 
Exception {
-               Testable javaSolution = () -> LongRidesSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
-       }
-
+    static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[] {});
+
+    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+
+    @Test
+    public void shortRide() throws Exception {
+        Instant oneMinLater = BEGINNING.plusSeconds(60);
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
+        Long markOneMinLater = oneMinLater.toEpochMilli();
+
+        TestRideSource source = new TestRideSource(rideStarted, 
endedOneMinLater, markOneMinLater);
+        assert (results(source).isEmpty());
+    }
+
+    @Test
+    public void outOfOrder() throws Exception {
+        Instant oneMinLater = BEGINNING.plusSeconds(60);
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
+        Long markOneMinLater = oneMinLater.toEpochMilli();
+
+        TestRideSource source = new TestRideSource(endedOneMinLater, 
rideStarted, markOneMinLater);
+        assert (results(source).isEmpty());
+    }
+
+    @Test
+    public void noStartShort() throws Exception {
+        Instant oneMinLater = BEGINNING.plusSeconds(60);
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
+        Long markOneMinLater = oneMinLater.toEpochMilli();
+
+        TestRideSource source = new TestRideSource(endedOneMinLater, 
markOneMinLater);
+        assert (results(source).isEmpty());
+    }
+
+    @Test
+    public void noEnd() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+
+        TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
+        assertEquals(Collections.singletonList(rideStarted), results(source));
+    }
+
+    @Test
+    public void longRide() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
+        TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusSeconds(180 * 60));
+
+        TestRideSource source =
+                new TestRideSource(rideStarted, mark2HoursLater, 
rideEnded3HoursLater);
+        assertEquals(Collections.singletonList(rideStarted), results(source));
+    }
+
+    @Test
+    public void startIsDelayedMoreThanTwoHours() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide rideEndedAfter1Hour = endRide(rideStarted, 
BEGINNING.plusSeconds(60 * 60));
+        Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 
60).toEpochMilli();
+
+        TestRideSource source =
+                new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, 
rideStarted);
+        assert (results(source).isEmpty());
+    }
+
+    private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, 
Instant endTime) {
+        return new TaxiRide(
+                rideId,
+                isStart,
+                startTime,
+                endTime,
+                -73.9947F,
+                40.750626F,
+                -73.9947F,
+                40.750626F,
+                (short) 1,
+                0,
+                0);
+    }
+
+    private TaxiRide startRide(long rideId, Instant startTime) {
+        return testRide(rideId, true, startTime, Instant.EPOCH);
+    }
+
+    private TaxiRide endRide(TaxiRide started, Instant endTime) {
+        return testRide(started.rideId, false, started.startTime, endTime);
+    }
+
+    protected List<TaxiRide> results(TestRideSource source) throws Exception {
+        Testable javaSolution = () -> LongRidesSolution.main(new String[] {});
+        return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
+    }
 }
diff --git 
a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
 
b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
index 20467b4..24d6648 100644
--- 
a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
+++ 
b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
@@ -29,43 +29,42 @@ import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 /**
  * The "Ride Cleansing" exercise from the Flink training in the docs.
  *
- * <p>The task of the exercise is to filter a data stream of taxi ride records 
to keep only rides that
- * start and end within New York City. The resulting stream should be printed.
- *
+ * <p>The task of the exercise is to filter a data stream of taxi ride records 
to keep only rides
+ * that start and end within New York City. The resulting stream should be 
printed.
  */
 public class RideCleansingExercise extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
-
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // start the data generator
-               DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               DataStream<TaxiRide> filteredRides = rides
-                               // filter out rides that do not start or stop 
in NYC
-                               .filter(new NYCFilter());
+        // start the data generator
+        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
 
-               // print the filtered stream
-               printOrTest(filteredRides);
+        DataStream<TaxiRide> filteredRides =
+                rides
+                        // filter out rides that do not start or stop in NYC
+                        .filter(new NYCFilter());
 
-               // run the cleansing pipeline
-               env.execute("Taxi Ride Cleansing");
-       }
+        // print the filtered stream
+        printOrTest(filteredRides);
 
-       private static class NYCFilter implements FilterFunction<TaxiRide> {
+        // run the cleansing pipeline
+        env.execute("Taxi Ride Cleansing");
+    }
 
-               @Override
-               public boolean filter(TaxiRide taxiRide) throws Exception {
-                       throw new MissingSolutionException();
-               }
-       }
+    private static class NYCFilter implements FilterFunction<TaxiRide> {
 
+        @Override
+        public boolean filter(TaxiRide taxiRide) throws Exception {
+            throw new MissingSolutionException();
+        }
+    }
 }
diff --git 
a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
 
b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index 1d7af4c..690e6c7 100644
--- 
a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ 
b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -29,42 +29,42 @@ import 
org.apache.flink.training.exercises.common.utils.GeoUtils;
 /**
  * Solution to the "Ride Cleansing" exercise of the Flink training in the docs.
  *
- * <p>The task of the exercise is to filter a data stream of taxi ride records 
to keep only rides that
- * start and end within New York City. The resulting stream should be printed.
- *
+ * <p>The task of the exercise is to filter a data stream of taxi ride records 
to keep only rides
+ * that start and end within New York City. The resulting stream should be 
printed.
  */
 public class RideCleansingSolution extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               // start the data generator
-               DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
+        // start the data generator
+        DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new 
TaxiRideGenerator()));
 
-               DataStream<TaxiRide> filteredRides = rides
-                               // keep only those rides and both start and end 
in NYC
-                               .filter(new NYCFilter());
+        DataStream<TaxiRide> filteredRides =
+                rides
+                        // keep only those rides and both start and end in NYC
+                        .filter(new NYCFilter());
 
-               // print the filtered stream
-               printOrTest(filteredRides);
+        // print the filtered stream
+        printOrTest(filteredRides);
 
-               // run the cleansing pipeline
-               env.execute("Taxi Ride Cleansing");
-       }
+        // run the cleansing pipeline
+        env.execute("Taxi Ride Cleansing");
+    }
 
-       public static class NYCFilter implements FilterFunction<TaxiRide> {
-               @Override
-               public boolean filter(TaxiRide taxiRide) {
-                       return GeoUtils.isInNYC(taxiRide.startLon, 
taxiRide.startLat) &&
-                                       GeoUtils.isInNYC(taxiRide.endLon, 
taxiRide.endLat);
-               }
-       }
+    public static class NYCFilter implements FilterFunction<TaxiRide> {
+        @Override
+        public boolean filter(TaxiRide taxiRide) {
+            return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
+                    && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
+        }
+    }
 }
diff --git 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
index e19f4f3..e3dda36 100644
--- 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
+++ 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
@@ -32,36 +32,45 @@ import static org.junit.Assert.assertEquals;
 
 public class RideCleansingTest extends TaxiRideTestBase<TaxiRide> {
 
-       static final Testable JAVA_EXERCISE = () -> 
RideCleansingExercise.main(new String[]{});
-
-       @Test
-       public void testInNYC() throws Exception {
-               TaxiRide atPennStation = testRide(-73.9947F, 40.750626F, 
-73.9947F, 40.750626F);
-
-               TestRideSource source = new TestRideSource(atPennStation);
-
-               assertEquals(Collections.singletonList(atPennStation), 
results(source));
-       }
-
-       @Test
-       public void testNotInNYC() throws Exception {
-               TaxiRide toThePole = testRide(-73.9947F, 40.750626F, 0, 90);
-               TaxiRide fromThePole = testRide(0, 90, -73.9947F, 40.750626F);
-               TaxiRide atNorthPole = testRide(0, 90, 0, 90);
-
-               TestRideSource source = new TestRideSource(toThePole, 
fromThePole, atNorthPole);
-
-               assertEquals(Collections.emptyList(), results(source));
-       }
-
-       private TaxiRide testRide(float startLon, float startLat, float endLon, 
float endLat) {
-               return new TaxiRide(1L, true, Instant.EPOCH, Instant.EPOCH,
-                               startLon, startLat, endLon, endLat, (short) 1, 
0, 0);
-       }
-
-       protected List<?> results(TestRideSource source) throws Exception {
-               Testable javaSolution = () -> RideCleansingSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
-       }
-
+    static final Testable JAVA_EXERCISE = () -> RideCleansingExercise.main(new 
String[] {});
+
+    @Test
+    public void testInNYC() throws Exception {
+        TaxiRide atPennStation = testRide(-73.9947F, 40.750626F, -73.9947F, 
40.750626F);
+
+        TestRideSource source = new TestRideSource(atPennStation);
+
+        assertEquals(Collections.singletonList(atPennStation), 
results(source));
+    }
+
+    @Test
+    public void testNotInNYC() throws Exception {
+        TaxiRide toThePole = testRide(-73.9947F, 40.750626F, 0, 90);
+        TaxiRide fromThePole = testRide(0, 90, -73.9947F, 40.750626F);
+        TaxiRide atNorthPole = testRide(0, 90, 0, 90);
+
+        TestRideSource source = new TestRideSource(toThePole, fromThePole, 
atNorthPole);
+
+        assertEquals(Collections.emptyList(), results(source));
+    }
+
+    private TaxiRide testRide(float startLon, float startLat, float endLon, 
float endLat) {
+        return new TaxiRide(
+                1L,
+                true,
+                Instant.EPOCH,
+                Instant.EPOCH,
+                startLon,
+                startLat,
+                endLon,
+                endLat,
+                (short) 1,
+                0,
+                0);
+    }
+
+    protected List<?> results(TestRideSource source) throws Exception {
+        Testable javaSolution = () -> RideCleansingSolution.main(new String[] 
{});
+        return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
+    }
 }
diff --git 
a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
 
b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
index 64175d2..7cdd9c0 100644
--- 
a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
+++ 
b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
@@ -35,52 +35,51 @@ import org.apache.flink.util.Collector;
  * The "Stateful Enrichment" exercise of the Flink training in the docs.
  *
  * <p>The goal for this exercise is to enrich TaxiRides with fare information.
- *
  */
 public class RidesAndFaresExercise extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
 
-               // set up streaming execution environment
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.setParallelism(ExerciseBase.parallelism);
+        // set up streaming execution environment
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(ExerciseBase.parallelism);
 
-               DataStream<TaxiRide> rides = env
-                               .addSource(rideSourceOrTest(new 
TaxiRideGenerator()))
-                               .filter((TaxiRide ride) -> ride.isStart)
-                               .keyBy((TaxiRide ride) -> ride.rideId);
+        DataStream<TaxiRide> rides =
+                env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+                        .filter((TaxiRide ride) -> ride.isStart)
+                        .keyBy((TaxiRide ride) -> ride.rideId);
 
-               DataStream<TaxiFare> fares = env
-                               .addSource(fareSourceOrTest(new 
TaxiFareGenerator()))
-                               .keyBy((TaxiFare fare) -> fare.rideId);
+        DataStream<TaxiFare> fares =
+                env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+                        .keyBy((TaxiFare fare) -> fare.rideId);
 
-               DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
-                               .connect(fares)
-                               .flatMap(new EnrichmentFunction());
+        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides =
+                rides.connect(fares).flatMap(new EnrichmentFunction());
 
-               printOrTest(enrichedRides);
+        printOrTest(enrichedRides);
 
-               env.execute("Join Rides with Fares (java RichCoFlatMap)");
-       }
+        env.execute("Join Rides with Fares (java RichCoFlatMap)");
+    }
 
-       public static class EnrichmentFunction extends 
RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
+    public static class EnrichmentFunction
+            extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, 
TaxiFare>> {
 
-               @Override
-               public void open(Configuration config) throws Exception {
-                       throw new MissingSolutionException();
-               }
+        @Override
+        public void open(Configuration config) throws Exception {
+            throw new MissingSolutionException();
+        }
 
-               @Override
-               public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, 
TaxiFare>> out) throws Exception {
-               }
+        @Override
+        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, 
TaxiFare>> out)
+                throws Exception {}
 
-               @Override
-               public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, 
TaxiFare>> out) throws Exception {
-               }
-       }
+        @Override
+        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, 
TaxiFare>> out)
+                throws Exception {}
+    }
 }
diff --git 
a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
 
b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index bcddc7b..9b8e096 100644
--- 
a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ 
b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -34,86 +34,94 @@ import 
org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.util.Collector;
 
 /**
- * Java reference implementation for the "Stateful Enrichment" exercise of the 
Flink training in the docs.
+ * Java reference implementation for the "Stateful Enrichment" exercise of the 
Flink training in the
+ * docs.
  *
  * <p>The goal for this exercise is to enrich TaxiRides with fare information.
- *
  */
 public class RidesAndFaresSolution extends ExerciseBase {
 
-       /**
-        * Main method.
-        *
-        * @throws Exception which occurs during job execution.
-        */
-       public static void main(String[] args) throws Exception {
-
-               // Set up streaming execution environment, including Web UI and 
REST endpoint.
-               // Checkpointing isn't needed for the RidesAndFares exercise; 
this setup is for
-               // using the State Processor API.
-
-               Configuration conf = new Configuration();
-               conf.setString("state.backend", "filesystem");
-               conf.setString("state.savepoints.dir", 
"file:///tmp/savepoints");
-               conf.setString("state.checkpoints.dir", 
"file:///tmp/checkpoints");
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
-               env.setParallelism(ExerciseBase.parallelism);
-
-               env.enableCheckpointing(10000L);
-               CheckpointConfig config = env.getCheckpointConfig();
-               
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
-               DataStream<TaxiRide> rides = env
-                               .addSource(rideSourceOrTest(new 
TaxiRideGenerator()))
-                               .filter((TaxiRide ride) -> ride.isStart)
-                               .keyBy((TaxiRide ride) -> ride.rideId);
-
-               DataStream<TaxiFare> fares = env
-                               .addSource(fareSourceOrTest(new 
TaxiFareGenerator()))
-                               .keyBy((TaxiFare fare) -> fare.rideId);
-
-               // Set a UID on the stateful flatmap operator so we can read 
its state using the State Processor API.
-               DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
-                               .connect(fares)
-                               .flatMap(new EnrichmentFunction())
-                               .uid("enrichment");
-
-               printOrTest(enrichedRides);
-
-               env.execute("Join Rides with Fares (java RichCoFlatMap)");
-       }
-
-       public static class EnrichmentFunction extends 
RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
-               // keyed, managed state
-               private ValueState<TaxiRide> rideState;
-               private ValueState<TaxiFare> fareState;
-
-               @Override
-               public void open(Configuration config) {
-                       rideState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("saved ride", TaxiRide.class));
-                       fareState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("saved fare", TaxiFare.class));
-               }
-
-               @Override
-               public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, 
TaxiFare>> out) throws Exception {
-                       TaxiFare fare = fareState.value();
-                       if (fare != null) {
-                               fareState.clear();
-                               out.collect(Tuple2.of(ride, fare));
-                       } else {
-                               rideState.update(ride);
-                       }
-               }
-
-               @Override
-               public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, 
TaxiFare>> out) throws Exception {
-                       TaxiRide ride = rideState.value();
-                       if (ride != null) {
-                               rideState.clear();
-                               out.collect(Tuple2.of(ride, fare));
-                       } else {
-                               fareState.update(fare);
-                       }
-               }
-       }
+    /**
+     * Main method.
+     *
+     * @throws Exception which occurs during job execution.
+     */
+    public static void main(String[] args) throws Exception {
+
+        // Set up streaming execution environment, including Web UI and REST 
endpoint.
+        // Checkpointing isn't needed for the RidesAndFares exercise; this 
setup is for
+        // using the State Processor API.
+
+        Configuration conf = new Configuration();
+        conf.setString("state.backend", "filesystem");
+        conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
+        conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
+        env.setParallelism(ExerciseBase.parallelism);
+
+        env.enableCheckpointing(10000L);
+        CheckpointConfig config = env.getCheckpointConfig();
+        config.enableExternalizedCheckpoints(
+                
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+        DataStream<TaxiRide> rides =
+                env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+                        .filter((TaxiRide ride) -> ride.isStart)
+                        .keyBy((TaxiRide ride) -> ride.rideId);
+
+        DataStream<TaxiFare> fares =
+                env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+                        .keyBy((TaxiFare fare) -> fare.rideId);
+
+        // Set a UID on the stateful flatmap operator so we can read its state 
using the State
+        // Processor API.
+        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides =
+                rides.connect(fares).flatMap(new 
EnrichmentFunction()).uid("enrichment");
+
+        printOrTest(enrichedRides);
+
+        env.execute("Join Rides with Fares (java RichCoFlatMap)");
+    }
+
+    public static class EnrichmentFunction
+            extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, 
TaxiFare>> {
+        // keyed, managed state
+        private ValueState<TaxiRide> rideState;
+        private ValueState<TaxiFare> fareState;
+
+        @Override
+        public void open(Configuration config) {
+            rideState =
+                    getRuntimeContext()
+                            .getState(new ValueStateDescriptor<>("saved ride", 
TaxiRide.class));
+            fareState =
+                    getRuntimeContext()
+                            .getState(new ValueStateDescriptor<>("saved fare", 
TaxiFare.class));
+        }
+
+        @Override
+        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, 
TaxiFare>> out)
+                throws Exception {
+            TaxiFare fare = fareState.value();
+            if (fare != null) {
+                fareState.clear();
+                out.collect(Tuple2.of(ride, fare));
+            } else {
+                rideState.update(ride);
+            }
+        }
+
+        @Override
+        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, 
TaxiFare>> out)
+                throws Exception {
+            TaxiRide ride = rideState.value();
+            if (ride != null) {
+                rideState.clear();
+                out.collect(Tuple2.of(ride, fare));
+            } else {
+                fareState.update(fare);
+            }
+        }
+    }
 }
diff --git 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
index 76b4407..a670f95 100644
--- 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
+++ 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
@@ -35,49 +35,52 @@ import static org.junit.Assert.assertThat;
 
 public class RidesAndFaresTest extends TaxiRideTestBase<Tuple2<TaxiRide, 
TaxiFare>> {
 
-       static final Testable JAVA_EXERCISE = () -> 
RidesAndFaresExercise.main(new String[]{});
-
-       final TaxiRide ride1 = testRide(1);
-       final TaxiRide ride2 = testRide(2);
-       final TaxiFare fare1 = testFare(1);
-       final TaxiFare fare2 = testFare(2);
-
-       @Test
-       public void testInOrder() throws Exception {
-               TestRideSource rides = new TestRideSource(ride1, ride2);
-               TestFareSource fares = new TestFareSource(fare1, fare2);
-
-               List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-                               Tuple2.of(ride1, fare1),
-                               Tuple2.of(ride2, fare2));
-
-               assertThat("Join results don't match", results(rides, fares), 
containsInAnyOrder(expected.toArray()));
-       }
-
-       @Test
-       public void testOutOfOrder() throws Exception {
-               TestRideSource rides = new TestRideSource(ride1, ride2);
-               TestFareSource fares = new TestFareSource(fare2, fare1);
-
-               List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-                               Tuple2.of(ride1, fare1),
-                               Tuple2.of(ride2, fare2));
-
-               assertThat("Join results don't match", results(rides, fares), 
containsInAnyOrder(expected.toArray()));
-       }
-
-       private TaxiRide testRide(long rideId) {
-               return new TaxiRide(rideId, true, Instant.EPOCH, Instant.EPOCH,
-                               0F, 0F, 0F, 0F, (short) 1, 0, rideId);
-       }
-
-       private TaxiFare testFare(long rideId) {
-               return new TaxiFare(rideId, 0, rideId, Instant.EPOCH, "", 0F, 
0F, 0F);
-       }
-
-       protected List<?> results(TestRideSource rides, TestFareSource fares) 
throws Exception {
-               Testable javaSolution = () -> RidesAndFaresSolution.main(new 
String[]{});
-               return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
-       }
-
+    static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new 
String[] {});
+
+    final TaxiRide ride1 = testRide(1);
+    final TaxiRide ride2 = testRide(2);
+    final TaxiFare fare1 = testFare(1);
+    final TaxiFare fare2 = testFare(2);
+
+    @Test
+    public void testInOrder() throws Exception {
+        TestRideSource rides = new TestRideSource(ride1, ride2);
+        TestFareSource fares = new TestFareSource(fare1, fare2);
+
+        List<Tuple2<TaxiRide, TaxiFare>> expected =
+                Arrays.asList(Tuple2.of(ride1, fare1), Tuple2.of(ride2, 
fare2));
+
+        assertThat(
+                "Join results don't match",
+                results(rides, fares),
+                containsInAnyOrder(expected.toArray()));
+    }
+
+    @Test
+    public void testOutOfOrder() throws Exception {
+        TestRideSource rides = new TestRideSource(ride1, ride2);
+        TestFareSource fares = new TestFareSource(fare2, fare1);
+
+        List<Tuple2<TaxiRide, TaxiFare>> expected =
+                Arrays.asList(Tuple2.of(ride1, fare1), Tuple2.of(ride2, 
fare2));
+
+        assertThat(
+                "Join results don't match",
+                results(rides, fares),
+                containsInAnyOrder(expected.toArray()));
+    }
+
+    private TaxiRide testRide(long rideId) {
+        return new TaxiRide(
+                rideId, true, Instant.EPOCH, Instant.EPOCH, 0F, 0F, 0F, 0F, 
(short) 1, 0, rideId);
+    }
+
+    private TaxiFare testFare(long rideId) {
+        return new TaxiFare(rideId, 0, rideId, Instant.EPOCH, "", 0F, 0F, 0F);
+    }
+
+    protected List<?> results(TestRideSource rides, TestFareSource fares) 
throws Exception {
+        Testable javaSolution = () -> RidesAndFaresSolution.main(new String[] 
{});
+        return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
+    }
 }

Reply via email to