This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git
commit c4baefd1830abe3ba9bcf200be0f5d00095cae12 Author: Rufus Refactor <[email protected]> AuthorDate: Tue Jul 13 21:45:34 2021 +0200 [FLINK-23338] Format code with Spotless/google-java-format --- README.md | 2 +- .../examples/ridecount/RideCountExample.java | 60 +-- .../exercises/common/datatypes/TaxiFare.java | 154 ++++---- .../exercises/common/datatypes/TaxiRide.java | 297 +++++++------- .../common/sources/TaxiFareGenerator.java | 35 +- .../common/sources/TaxiRideGenerator.java | 109 +++--- .../exercises/common/utils/DataGenerator.java | 304 +++++++-------- .../exercises/common/utils/ExerciseBase.java | 98 ++--- .../training/exercises/common/utils/GeoUtils.java | 433 ++++++++++----------- .../common/utils/MissingSolutionException.java | 10 +- .../exercises/testing/TaxiRideTestBase.java | 291 +++++++------- .../training/exercises/testing/TestSource.java | 44 +-- hourly-tips/DISCUSSION.md | 34 +- .../exercises/hourlytips/HourlyTipsExercise.java | 38 +- .../solutions/hourlytips/HourlyTipsSolution.java | 92 ++--- .../exercises/hourlytips/HourlyTipsTest.java | 97 ++--- long-ride-alerts/DISCUSSION.md | 2 +- long-ride-alerts/README.md | 8 +- .../exercises/longrides/LongRidesExercise.java | 61 ++- .../solutions/longrides/LongRidesSolution.java | 136 +++---- .../exercises/longrides/LongRidesTest.java | 178 +++++---- .../ridecleansing/RideCleansingExercise.java | 57 ++- .../ridecleansing/RideCleansingSolution.java | 58 +-- .../exercises/ridecleansing/RideCleansingTest.java | 73 ++-- .../ridesandfares/RidesAndFaresExercise.java | 69 ++-- .../ridesandfares/RidesAndFaresSolution.java | 162 ++++---- .../exercises/ridesandfares/RidesAndFaresTest.java | 93 ++--- 27 files changed, 1489 insertions(+), 1506 deletions(-) diff --git a/README.md b/README.md index 834032a..37c1d26 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ Once that’s done you should be able to open [`RideCleansingTest`](ride-cleansi ## Using the Taxi Data Streams -These exercises use data [generators](common/src/main/java/org/apache/flink/training/exercises/common/sources) that produce simulated event streams +These exercises use data [generators](common/src/main/java/org/apache/flink/training/exercises/common/sources) that produce simulated event streams inspired by those shared by the [New York City Taxi & Limousine Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml) in their public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi rides in New York City. diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java index 74e56fe..36b7b89 100644 --- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java +++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java @@ -29,44 +29,46 @@ import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator; /** * Example that counts the rides for each driver. * - * <p>Note that this is implicitly keeping state for each driver. - * This sort of simple, non-windowed aggregation on an unbounded set of keys will use an unbounded amount of state. - * When this is an issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide + * <p>Note that this is implicitly keeping state for each driver. This sort of simple, non-windowed + * aggregation on an unbounded set of keys will use an unbounded amount of state. When this is an + * issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide * mechanisms for expiring state for stale keys. */ public class RideCountExample { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // start the data generator - DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()); + // start the data generator + DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator()); - // map each ride to a tuple of (driverId, 1) - DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() { - @Override - public Tuple2<Long, Long> map(TaxiRide ride) { - return Tuple2.of(ride.driverId, 1L); - } - }); + // map each ride to a tuple of (driverId, 1) + DataStream<Tuple2<Long, Long>> tuples = + rides.map( + new MapFunction<TaxiRide, Tuple2<Long, Long>>() { + @Override + public Tuple2<Long, Long> map(TaxiRide ride) { + return Tuple2.of(ride.driverId, 1L); + } + }); - // partition the stream by the driverId - KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0); + // partition the stream by the driverId + KeyedStream<Tuple2<Long, Long>, Long> keyedByDriverId = tuples.keyBy(t -> t.f0); - // count the rides for each driver - DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1); + // count the rides for each driver + DataStream<Tuple2<Long, Long>> rideCounts = keyedByDriverId.sum(1); - // we could, in fact, print out any or all of these streams - rideCounts.print(); + // we could, in fact, print out any or all of these streams + rideCounts.print(); - // run the cleansing pipeline - env.execute("Ride Count"); - } + // run the cleansing pipeline + env.execute("Ride Count"); + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java index 013f2d3..aa74669 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java @@ -26,95 +26,93 @@ import java.time.Instant; /** * A TaxiFare has payment information about a taxi ride. * - * <p>It has these fields in common with the TaxiRides - * - the rideId - * - the taxiId - * - the driverId - * - the startTime + * <p>It has these fields in common with the TaxiRides - the rideId - the taxiId - the driverId - + * the startTime * - * <p>It also includes - * - the paymentType - * - the tip - * - the tolls - * - the totalFare + * <p>It also includes - the paymentType - the tip - the tolls - the totalFare */ public class TaxiFare implements Serializable { - /** - * Creates a TaxiFare with now as the start time. - */ - public TaxiFare() { - this.startTime = Instant.now(); - } + /** Creates a TaxiFare with now as the start time. */ + public TaxiFare() { + this.startTime = Instant.now(); + } - /** - * Invents a TaxiFare. - */ - public TaxiFare(long rideId) { - DataGenerator g = new DataGenerator(rideId); + /** Invents a TaxiFare. */ + public TaxiFare(long rideId) { + DataGenerator g = new DataGenerator(rideId); - this.rideId = rideId; - this.taxiId = g.taxiId(); - this.driverId = g.driverId(); - this.startTime = g.startTime(); - this.paymentType = g.paymentType(); - this.tip = g.tip(); - this.tolls = g.tolls(); - this.totalFare = g.totalFare(); - } + this.rideId = rideId; + this.taxiId = g.taxiId(); + this.driverId = g.driverId(); + this.startTime = g.startTime(); + this.paymentType = g.paymentType(); + this.tip = g.tip(); + this.tolls = g.tolls(); + this.totalFare = g.totalFare(); + } - /** - * Creates a TaxiFare with the given parameters. - */ - public TaxiFare(long rideId, long taxiId, long driverId, Instant startTime, String paymentType, float tip, float tolls, float totalFare) { - this.rideId = rideId; - this.taxiId = taxiId; - this.driverId = driverId; - this.startTime = startTime; - this.paymentType = paymentType; - this.tip = tip; - this.tolls = tolls; - this.totalFare = totalFare; - } + /** Creates a TaxiFare with the given parameters. */ + public TaxiFare( + long rideId, + long taxiId, + long driverId, + Instant startTime, + String paymentType, + float tip, + float tolls, + float totalFare) { + this.rideId = rideId; + this.taxiId = taxiId; + this.driverId = driverId; + this.startTime = startTime; + this.paymentType = paymentType; + this.tip = tip; + this.tolls = tolls; + this.totalFare = totalFare; + } - public long rideId; - public long taxiId; - public long driverId; - public Instant startTime; - public String paymentType; - public float tip; - public float tolls; - public float totalFare; + public long rideId; + public long taxiId; + public long driverId; + public Instant startTime; + public String paymentType; + public float tip; + public float tolls; + public float totalFare; - @Override - public String toString() { + @Override + public String toString() { - return rideId + "," + - taxiId + "," + - driverId + "," + - startTime.toString() + "," + - paymentType + "," + - tip + "," + - tolls + "," + - totalFare; - } + return rideId + + "," + + taxiId + + "," + + driverId + + "," + + startTime.toString() + + "," + + paymentType + + "," + + tip + + "," + + tolls + + "," + + totalFare; + } - @Override - public boolean equals(Object other) { - return other instanceof TaxiFare && - this.rideId == ((TaxiFare) other).rideId; - } + @Override + public boolean equals(Object other) { + return other instanceof TaxiFare && this.rideId == ((TaxiFare) other).rideId; + } - @Override - public int hashCode() { - return (int) this.rideId; - } - - /** - * Gets the fare's start time. - */ - public long getEventTime() { - return startTime.toEpochMilli(); - } + @Override + public int hashCode() { + return (int) this.rideId; + } + /** Gets the fare's start time. */ + public long getEventTime() { + return startTime.toEpochMilli(); + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java index 3bec5e7..378b9a0 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java @@ -30,156 +30,155 @@ import java.time.Instant; * A TaxiRide is a taxi ride event. There are two types of events, a taxi ride start event and a * taxi ride end event. The isStart flag specifies the type of the event. * - * <p>A TaxiRide consists of - * - the rideId of the event which is identical for start and end record - * - the type of the event (start or end) - * - the time of the event - * - the longitude of the start location - * - the latitude of the start location - * - the longitude of the end location - * - the latitude of the end location - * - the passengerCnt of the ride - * - the taxiId - * - the driverId + * <p>A TaxiRide consists of - the rideId of the event which is identical for start and end record - + * the type of the event (start or end) - the time of the event - the longitude of the start + * location - the latitude of the start location - the longitude of the end location - the latitude + * of the end location - the passengerCnt of the ride - the taxiId - the driverId */ public class TaxiRide implements Comparable<TaxiRide>, Serializable { - /** - * Creates a new TaxiRide with now as start and end time. - */ - public TaxiRide() { - this.startTime = Instant.now(); - this.endTime = Instant.now(); - } - - /** - * Invents a TaxiRide. - */ - public TaxiRide(long rideId, boolean isStart) { - DataGenerator g = new DataGenerator(rideId); - - this.rideId = rideId; - this.isStart = isStart; - this.startTime = g.startTime(); - this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime(); - this.startLon = g.startLon(); - this.startLat = g.startLat(); - this.endLon = g.endLon(); - this.endLat = g.endLat(); - this.passengerCnt = g.passengerCnt(); - this.taxiId = g.taxiId(); - this.driverId = g.driverId(); - } - - /** - * Creates a TaxiRide with the given parameters. - */ - public TaxiRide(long rideId, boolean isStart, Instant startTime, Instant endTime, - float startLon, float startLat, float endLon, float endLat, - short passengerCnt, long taxiId, long driverId) { - this.rideId = rideId; - this.isStart = isStart; - this.startTime = startTime; - this.endTime = endTime; - this.startLon = startLon; - this.startLat = startLat; - this.endLon = endLon; - this.endLat = endLat; - this.passengerCnt = passengerCnt; - this.taxiId = taxiId; - this.driverId = driverId; - } - - public long rideId; - public boolean isStart; - public Instant startTime; - public Instant endTime; - public float startLon; - public float startLat; - public float endLon; - public float endLat; - public short passengerCnt; - public long taxiId; - public long driverId; - - @Override - public String toString() { - - return rideId + "," + - (isStart ? "START" : "END") + "," + - startTime.toString() + "," + - endTime.toString() + "," + - startLon + "," + - startLat + "," + - endLon + "," + - endLat + "," + - passengerCnt + "," + - taxiId + "," + - driverId; - } - - /** - * Compares this TaxiRide with the given one. - * - * <ul> - * <li>sort by timestamp,</li> - * <li>putting START events before END events if they have the same timestamp</li> - * </ul> - */ - public int compareTo(@Nullable TaxiRide other) { - if (other == null) { - return 1; - } - int compareTimes = Long.compare(this.getEventTime(), other.getEventTime()); - if (compareTimes == 0) { - if (this.isStart == other.isStart) { - return 0; - } - else { - if (this.isStart) { - return -1; - } - else { - return 1; - } - } - } - else { - return compareTimes; - } - } - - @Override - public boolean equals(Object other) { - return other instanceof TaxiRide && - this.rideId == ((TaxiRide) other).rideId; - } - - @Override - public int hashCode() { - return (int) this.rideId; - } - - /** - * Gets the ride's time stamp (start or end time depending on {@link #isStart}). - */ - public long getEventTime() { - if (isStart) { - return startTime.toEpochMilli(); - } - else { - return endTime.toEpochMilli(); - } - } - - /** - * Gets the distance from the ride location to the given one. - */ - public double getEuclideanDistance(double longitude, double latitude) { - if (this.isStart) { - return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat); - } else { - return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat); - } - } + /** Creates a new TaxiRide with now as start and end time. */ + public TaxiRide() { + this.startTime = Instant.now(); + this.endTime = Instant.now(); + } + + /** Invents a TaxiRide. */ + public TaxiRide(long rideId, boolean isStart) { + DataGenerator g = new DataGenerator(rideId); + + this.rideId = rideId; + this.isStart = isStart; + this.startTime = g.startTime(); + this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime(); + this.startLon = g.startLon(); + this.startLat = g.startLat(); + this.endLon = g.endLon(); + this.endLat = g.endLat(); + this.passengerCnt = g.passengerCnt(); + this.taxiId = g.taxiId(); + this.driverId = g.driverId(); + } + + /** Creates a TaxiRide with the given parameters. */ + public TaxiRide( + long rideId, + boolean isStart, + Instant startTime, + Instant endTime, + float startLon, + float startLat, + float endLon, + float endLat, + short passengerCnt, + long taxiId, + long driverId) { + this.rideId = rideId; + this.isStart = isStart; + this.startTime = startTime; + this.endTime = endTime; + this.startLon = startLon; + this.startLat = startLat; + this.endLon = endLon; + this.endLat = endLat; + this.passengerCnt = passengerCnt; + this.taxiId = taxiId; + this.driverId = driverId; + } + + public long rideId; + public boolean isStart; + public Instant startTime; + public Instant endTime; + public float startLon; + public float startLat; + public float endLon; + public float endLat; + public short passengerCnt; + public long taxiId; + public long driverId; + + @Override + public String toString() { + + return rideId + + "," + + (isStart ? "START" : "END") + + "," + + startTime.toString() + + "," + + endTime.toString() + + "," + + startLon + + "," + + startLat + + "," + + endLon + + "," + + endLat + + "," + + passengerCnt + + "," + + taxiId + + "," + + driverId; + } + + /** + * Compares this TaxiRide with the given one. + * + * <ul> + * <li>sort by timestamp, + * <li>putting START events before END events if they have the same timestamp + * </ul> + */ + public int compareTo(@Nullable TaxiRide other) { + if (other == null) { + return 1; + } + int compareTimes = Long.compare(this.getEventTime(), other.getEventTime()); + if (compareTimes == 0) { + if (this.isStart == other.isStart) { + return 0; + } else { + if (this.isStart) { + return -1; + } else { + return 1; + } + } + } else { + return compareTimes; + } + } + + @Override + public boolean equals(Object other) { + return other instanceof TaxiRide && this.rideId == ((TaxiRide) other).rideId; + } + + @Override + public int hashCode() { + return (int) this.rideId; + } + + /** Gets the ride's time stamp (start or end time depending on {@link #isStart}). */ + public long getEventTime() { + if (isStart) { + return startTime.toEpochMilli(); + } else { + return endTime.toEpochMilli(); + } + } + + /** Gets the distance from the ride location to the given one. */ + public double getEuclideanDistance(double longitude, double latitude) { + if (this.isStart) { + return GeoUtils.getEuclideanDistance( + (float) longitude, (float) latitude, this.startLon, this.startLat); + } else { + return GeoUtils.getEuclideanDistance( + (float) longitude, (float) latitude, this.endLon, this.endLat); + } + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java index c3ca076..17686fa 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java @@ -27,31 +27,30 @@ import org.apache.flink.training.exercises.common.datatypes.TaxiFare; * timestamps. * * <p>The stream is generated in order, and it includes Watermarks. - * */ public class TaxiFareGenerator implements SourceFunction<TaxiFare> { - private volatile boolean running = true; + private volatile boolean running = true; - @Override - public void run(SourceContext<TaxiFare> ctx) throws Exception { + @Override + public void run(SourceContext<TaxiFare> ctx) throws Exception { - long id = 1; + long id = 1; - while (running) { - TaxiFare fare = new TaxiFare(id); - id += 1; + while (running) { + TaxiFare fare = new TaxiFare(id); + id += 1; - ctx.collectWithTimestamp(fare, fare.getEventTime()); - ctx.emitWatermark(new Watermark(fare.getEventTime())); + ctx.collectWithTimestamp(fare, fare.getEventTime()); + ctx.emitWatermark(new Watermark(fare.getEventTime())); - // match our event production rate to that of the TaxiRideGenerator - Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT); - } - } + // match our event production rate to that of the TaxiRideGenerator + Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT); + } + } - @Override - public void cancel() { - running = false; - } + @Override + public void cancel() { + running = false; + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java index 8ee5d01..40f498a 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java @@ -32,61 +32,62 @@ import java.util.Random; * timestamps. * * <p>The stream is produced out-of-order, and includes Watermarks (with no late events). - * */ public class TaxiRideGenerator implements SourceFunction<TaxiRide> { - public static final int SLEEP_MILLIS_PER_EVENT = 10; - private static final int BATCH_SIZE = 5; - private volatile boolean running = true; - - @Override - public void run(SourceContext<TaxiRide> ctx) throws Exception { - - PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100); - long id = 0; - long maxStartTime = 0; - - while (running) { - - // generate a batch of START events - List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE); - for (int i = 1; i <= BATCH_SIZE; i++) { - TaxiRide ride = new TaxiRide(id + i, true); - startEvents.add(ride); - // the start times may be in order, but let's not assume that - maxStartTime = Math.max(maxStartTime, ride.startTime.toEpochMilli()); - } - - // enqueue the corresponding END events - for (int i = 1; i <= BATCH_SIZE; i++) { - endEventQ.add(new TaxiRide(id + i, false)); - } - - // release the END events coming before the end of this new batch - // (this allows a few END events to precede their matching START event) - while (endEventQ.peek().getEventTime() <= maxStartTime) { - TaxiRide ride = endEventQ.poll(); - ctx.collectWithTimestamp(ride, ride.getEventTime()); - } - - // then emit the new START events (out-of-order) - java.util.Collections.shuffle(startEvents, new Random(id)); - startEvents.iterator().forEachRemaining(r -> ctx.collectWithTimestamp(r, r.getEventTime())); - - // produce a Watermark - ctx.emitWatermark(new Watermark(maxStartTime)); - - // prepare for the next batch - id += BATCH_SIZE; - - // don't go too fast - Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT); - } - } - - @Override - public void cancel() { - running = false; - } + public static final int SLEEP_MILLIS_PER_EVENT = 10; + private static final int BATCH_SIZE = 5; + private volatile boolean running = true; + + @Override + public void run(SourceContext<TaxiRide> ctx) throws Exception { + + PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100); + long id = 0; + long maxStartTime = 0; + + while (running) { + + // generate a batch of START events + List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE); + for (int i = 1; i <= BATCH_SIZE; i++) { + TaxiRide ride = new TaxiRide(id + i, true); + startEvents.add(ride); + // the start times may be in order, but let's not assume that + maxStartTime = Math.max(maxStartTime, ride.startTime.toEpochMilli()); + } + + // enqueue the corresponding END events + for (int i = 1; i <= BATCH_SIZE; i++) { + endEventQ.add(new TaxiRide(id + i, false)); + } + + // release the END events coming before the end of this new batch + // (this allows a few END events to precede their matching START event) + while (endEventQ.peek().getEventTime() <= maxStartTime) { + TaxiRide ride = endEventQ.poll(); + ctx.collectWithTimestamp(ride, ride.getEventTime()); + } + + // then emit the new START events (out-of-order) + java.util.Collections.shuffle(startEvents, new Random(id)); + startEvents + .iterator() + .forEachRemaining(r -> ctx.collectWithTimestamp(r, r.getEventTime())); + + // produce a Watermark + ctx.emitWatermark(new Watermark(maxStartTime)); + + // prepare for the next batch + id += BATCH_SIZE; + + // don't go too fast + Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT); + } + } + + @Override + public void cancel() { + running = false; + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java index f40c3c4..24439bd 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java @@ -30,167 +30,145 @@ import java.util.Random; */ public class DataGenerator { - private static final int SECONDS_BETWEEN_RIDES = 20; - private static final int NUMBER_OF_DRIVERS = 200; - private static final Instant beginTime = Instant.parse("2020-01-01T12:00:00.00Z"); - - private transient long rideId; - - /** - * Creates a DataGenerator for the specified rideId. - */ - public DataGenerator(long rideId) { - this.rideId = rideId; - } - - /** - * Deterministically generates and returns the startTime for this ride. - */ - public Instant startTime() { - return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId); - } - - /** - * Deterministically generates and returns the endTime for this ride. - */ - public Instant endTime() { - return startTime().plusSeconds(60 * rideDurationMinutes()); - } - - /** - * Deterministically generates and returns the driverId for this ride. - * The HourlyTips exercise is more interesting if aren't too many drivers. - */ - public long driverId() { - Random rnd = new Random(rideId); - return 2013000000 + rnd.nextInt(NUMBER_OF_DRIVERS); - } - - /** - * Deterministically generates and returns the taxiId for this ride. - */ - public long taxiId() { - return driverId(); - } - - /** - * Deterministically generates and returns the startLat for this ride. - * - * <p>The locations are used in the RideCleansing exercise. - * We want some rides to be outside of NYC. - */ - public float startLat() { - return aFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F)); - } - - /** - * Deterministically generates and returns the startLon for this ride. - */ - public float startLon() { - return aFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F)); - } - - /** - * Deterministically generates and returns the endLat for this ride. - */ - public float endLat() { - return bFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F)); - } - - /** - * Deterministically generates and returns the endLon for this ride. - */ - public float endLon() { - return bFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F)); - } - - /** - * Deterministically generates and returns the passengerCnt for this ride. - */ - public short passengerCnt() { - return (short) aLong(1L, 4L); - } - - /** - * Deterministically generates and returns the paymentType for this ride. - */ - public String paymentType() { - return (rideId % 2 == 0) ? "CARD" : "CASH"; - } - - /** - * Deterministically generates and returns the tip for this ride. - * - * <p>The HourlyTips exercise is more interesting if there's some significant variation in tipping. - */ - public float tip() { - return aLong(0L, 60L, 10F, 15F); - } - - /** - * Deterministically generates and returns the tolls for this ride. - */ - public float tolls() { - return (rideId % 10 == 0) ? aLong(0L, 5L) : 0L; - } - - /** - * Deterministically generates and returns the totalFare for this ride. - */ - public float totalFare() { - return (float) (3.0 + (1.0 * rideDurationMinutes()) + tip() + tolls()); - } - - /** - * The LongRides exercise needs to have some rides with a duration > 2 hours, but not too many. - */ - private long rideDurationMinutes() { - return aLong(0L, 600, 20, 40); - } - - // ------------------------------------- - - private long aLong(long min, long max) { - float mean = (min + max) / 2.0F; - float stddev = (max - min) / 8F; - - return aLong(min, max, mean, stddev); - } - - // the rideId is used as the seed to guarantee deterministic results - private long aLong(long min, long max, float mean, float stddev) { - Random rnd = new Random(rideId); - long value; - do { - value = (long) Math.round((stddev * rnd.nextGaussian()) + mean); - } while ((value < min) || (value > max)); - return value; - } - - // ------------------------------------- - - private float aFloat(float min, float max) { - float mean = (min + max) / 2.0F; - float stddev = (max - min) / 8F; - - return aFloat(rideId, min, max, mean, stddev); - } - - private float bFloat(float min, float max) { - float mean = (min + max) / 2.0F; - float stddev = (max - min) / 8F; - - return aFloat(rideId + 42, min, max, mean, stddev); - } - - // the rideId is used as the seed to guarantee deterministic results - private float aFloat(long seed, float min, float max, float mean, float stddev) { - Random rnd = new Random(seed); - float value; - do { - value = (float) (stddev * rnd.nextGaussian()) + mean; - } while ((value < min) || (value > max)); - return value; - } - + private static final int SECONDS_BETWEEN_RIDES = 20; + private static final int NUMBER_OF_DRIVERS = 200; + private static final Instant beginTime = Instant.parse("2020-01-01T12:00:00.00Z"); + + private transient long rideId; + + /** Creates a DataGenerator for the specified rideId. */ + public DataGenerator(long rideId) { + this.rideId = rideId; + } + + /** Deterministically generates and returns the startTime for this ride. */ + public Instant startTime() { + return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId); + } + + /** Deterministically generates and returns the endTime for this ride. */ + public Instant endTime() { + return startTime().plusSeconds(60 * rideDurationMinutes()); + } + + /** + * Deterministically generates and returns the driverId for this ride. The HourlyTips exercise + * is more interesting if aren't too many drivers. + */ + public long driverId() { + Random rnd = new Random(rideId); + return 2013000000 + rnd.nextInt(NUMBER_OF_DRIVERS); + } + + /** Deterministically generates and returns the taxiId for this ride. */ + public long taxiId() { + return driverId(); + } + + /** + * Deterministically generates and returns the startLat for this ride. + * + * <p>The locations are used in the RideCleansing exercise. We want some rides to be outside of + * NYC. + */ + public float startLat() { + return aFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F)); + } + + /** Deterministically generates and returns the startLon for this ride. */ + public float startLon() { + return aFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F)); + } + + /** Deterministically generates and returns the endLat for this ride. */ + public float endLat() { + return bFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F)); + } + + /** Deterministically generates and returns the endLon for this ride. */ + public float endLon() { + return bFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F)); + } + + /** Deterministically generates and returns the passengerCnt for this ride. */ + public short passengerCnt() { + return (short) aLong(1L, 4L); + } + + /** Deterministically generates and returns the paymentType for this ride. */ + public String paymentType() { + return (rideId % 2 == 0) ? "CARD" : "CASH"; + } + + /** + * Deterministically generates and returns the tip for this ride. + * + * <p>The HourlyTips exercise is more interesting if there's some significant variation in + * tipping. + */ + public float tip() { + return aLong(0L, 60L, 10F, 15F); + } + + /** Deterministically generates and returns the tolls for this ride. */ + public float tolls() { + return (rideId % 10 == 0) ? aLong(0L, 5L) : 0L; + } + + /** Deterministically generates and returns the totalFare for this ride. */ + public float totalFare() { + return (float) (3.0 + (1.0 * rideDurationMinutes()) + tip() + tolls()); + } + + /** + * The LongRides exercise needs to have some rides with a duration > 2 hours, but not too many. + */ + private long rideDurationMinutes() { + return aLong(0L, 600, 20, 40); + } + + // ------------------------------------- + + private long aLong(long min, long max) { + float mean = (min + max) / 2.0F; + float stddev = (max - min) / 8F; + + return aLong(min, max, mean, stddev); + } + + // the rideId is used as the seed to guarantee deterministic results + private long aLong(long min, long max, float mean, float stddev) { + Random rnd = new Random(rideId); + long value; + do { + value = (long) Math.round((stddev * rnd.nextGaussian()) + mean); + } while ((value < min) || (value > max)); + return value; + } + + // ------------------------------------- + + private float aFloat(float min, float max) { + float mean = (min + max) / 2.0F; + float stddev = (max - min) / 8F; + + return aFloat(rideId, min, max, mean, stddev); + } + + private float bFloat(float min, float max) { + float mean = (min + max) / 2.0F; + float stddev = (max - min) / 8F; + + return aFloat(rideId + 42, min, max, mean, stddev); + } + + // the rideId is used as the seed to guarantee deterministic results + private float aFloat(long seed, float min, float max, float mean, float stddev) { + Random rnd = new Random(seed); + float value; + do { + value = (float) (stddev * rnd.nextGaussian()) + mean; + } while ((value < min) || (value > max)); + return value; + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java index 172b300..c59a7a7 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java @@ -23,65 +23,53 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiFare; import org.apache.flink.training.exercises.common.datatypes.TaxiRide; -/** - * Base for all exercises with a few helper methods. - */ +/** Base for all exercises with a few helper methods. */ public class ExerciseBase { - public static SourceFunction<TaxiRide> rides = null; - public static SourceFunction<TaxiFare> fares = null; - public static SourceFunction<String> strings = null; - public static SinkFunction out = null; - public static int parallelism = 4; + public static SourceFunction<TaxiRide> rides = null; + public static SourceFunction<TaxiFare> fares = null; + public static SourceFunction<String> strings = null; + public static SinkFunction out = null; + public static int parallelism = 4; - /** - * Retrieves a test source during unit tests and the given one during normal execution. - */ - public static SourceFunction<TaxiRide> rideSourceOrTest(SourceFunction<TaxiRide> source) { - if (rides == null) { - return source; - } - return rides; - } + /** Retrieves a test source during unit tests and the given one during normal execution. */ + public static SourceFunction<TaxiRide> rideSourceOrTest(SourceFunction<TaxiRide> source) { + if (rides == null) { + return source; + } + return rides; + } - /** - * Retrieves a test source during unit tests and the given one during normal execution. - */ - public static SourceFunction<TaxiFare> fareSourceOrTest(SourceFunction<TaxiFare> source) { - if (fares == null) { - return source; - } - return fares; - } + /** Retrieves a test source during unit tests and the given one during normal execution. */ + public static SourceFunction<TaxiFare> fareSourceOrTest(SourceFunction<TaxiFare> source) { + if (fares == null) { + return source; + } + return fares; + } - /** - * Retrieves a test source during unit tests and the given one during normal execution. - */ - public static SourceFunction<String> stringSourceOrTest(SourceFunction<String> source) { - if (strings == null) { - return source; - } - return strings; - } + /** Retrieves a test source during unit tests and the given one during normal execution. */ + public static SourceFunction<String> stringSourceOrTest(SourceFunction<String> source) { + if (strings == null) { + return source; + } + return strings; + } - /** - * Prints the given data stream during normal execution and collects outputs during tests. - */ - public static void printOrTest(org.apache.flink.streaming.api.datastream.DataStream<?> ds) { - if (out == null) { - ds.print(); - } else { - ds.addSink(out); - } - } + /** Prints the given data stream during normal execution and collects outputs during tests. */ + public static void printOrTest(org.apache.flink.streaming.api.datastream.DataStream<?> ds) { + if (out == null) { + ds.print(); + } else { + ds.addSink(out); + } + } - /** - * Prints the given data stream during normal execution and collects outputs during tests. - */ - public static void printOrTest(org.apache.flink.streaming.api.scala.DataStream<?> ds) { - if (out == null) { - ds.print(); - } else { - ds.addSink(out); - } - } + /** Prints the given data stream during normal execution and collects outputs during tests. */ + public static void printOrTest(org.apache.flink.streaming.api.scala.DataStream<?> ds) { + if (out == null) { + ds.print(); + } else { + ds.addSink(out); + } + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java index e638ab3..68e93f3 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java @@ -22,231 +22,214 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -/** - * GeoUtils provides utility methods to deal with locations for the data streaming exercises. - */ +/** GeoUtils provides utility methods to deal with locations for the data streaming exercises. */ public class GeoUtils { - // geo boundaries of the area of NYC - public static final double LON_EAST = -73.7; - public static final double LON_WEST = -74.05; - public static final double LAT_NORTH = 41.0; - public static final double LAT_SOUTH = 40.5; - - // area width and height - public static final double LON_WIDTH = 74.05 - 73.7; - public static final double LAT_HEIGHT = 41.0 - 40.5; - - // delta step to create artificial grid overlay of NYC - public static final double DELTA_LON = 0.0014; - public static final double DELTA_LAT = 0.00125; - - // ( |LON_WEST| - |LON_EAST| ) / DELTA_LAT - public static final int NUMBER_OF_GRID_X = 250; - // ( LAT_NORTH - LAT_SOUTH ) / DELTA_LON - public static final int NUMBER_OF_GRID_Y = 400; - - public static final float DEG_LEN = 110.25f; - - /** - * Checks if a location specified by longitude and latitude values is - * within the geo boundaries of New York City. - * - * @param lon longitude of the location to check - * @param lat latitude of the location to check - * - * @return true if the location is within NYC boundaries, otherwise false. - */ - public static boolean isInNYC(float lon, float lat) { - - return !(lon > LON_EAST || lon < LON_WEST) && - !(lat > LAT_NORTH || lat < LAT_SOUTH); - } - - /** - * Maps a location specified by latitude and longitude values to a cell of a - * grid covering the area of NYC. - * The grid cells are roughly 100 x 100 m and sequentially number from north-west - * to south-east starting by zero. - * - * @param lon longitude of the location to map - * @param lat latitude of the location to map - * - * @return id of mapped grid cell. - */ - public static int mapToGridCell(float lon, float lat) { - int xIndex = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon)) / DELTA_LON); - int yIndex = (int) Math.floor((LAT_NORTH - lat) / DELTA_LAT); - - return xIndex + (yIndex * NUMBER_OF_GRID_X); - } - - /** - * Maps the direct path between two locations specified by longitude and latitude to a list of - * cells of a grid covering the area of NYC. - * The grid cells are roughly 100 x 100 m and sequentially number from north-west - * to south-east starting by zero. - * - * @param lon1 longitude of the first location - * @param lat1 latitude of the first location - * @param lon2 longitude of the second location - * @param lat2 latitude of the second location - * - * @return A list of cell ids - */ - public static List<Integer> mapToGridCellsOnWay(float lon1, float lat1, float lon2, float lat2) { - - int x1 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon1)) / DELTA_LON); - int y1 = (int) Math.floor((LAT_NORTH - lat1) / DELTA_LAT); - - int x2 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon2)) / DELTA_LON); - int y2 = (int) Math.floor((LAT_NORTH - lat2) / DELTA_LAT); - - int startX, startY, endX, endY; - if (x1 <= x2) { - startX = x1; - startY = y1; - endX = x2; - endY = y2; - } - else { - startX = x2; - startY = y2; - endX = x1; - endY = y1; - } - - double slope = (endY - startY) / ((endX - startX) + 0.00000001); - - int curX = startX; - int curY = startY; - - ArrayList<Integer> cellIds = new ArrayList<>(64); - cellIds.add(curX + (curY * NUMBER_OF_GRID_X)); - - while (curX < endX || curY != endY) { - - if (slope > 0) { - double y = (curX - startX + 0.5) * slope + startY - 0.5; - - if (y > curY - 0.05 && y < curY + 0.05) { - curX++; - curY++; - } - else if (y < curY) { - curX++; - } - else { - curY++; - } - } - else { - double y = (curX - startX + 0.5) * slope + startY + 0.5; - - if (y > curY - 0.05 && y < curY + 0.05) { - curX++; - curY--; - } - if (y > curY) { - curX++; - } - else { - curY--; - } - - } - - cellIds.add(curX + (curY * NUMBER_OF_GRID_X)); - } - - return cellIds; - } - - /** - * Returns the longitude of the center of a grid cell. - * - * @param gridCellId The grid cell. - * - * @return The longitude value of the cell's center. - */ - public static float getGridCellCenterLon(int gridCellId) { - - int xIndex = gridCellId % NUMBER_OF_GRID_X; - - return (float) (Math.abs(LON_WEST) - (xIndex * DELTA_LON) - (DELTA_LON / 2)) * -1.0f; - } - - /** - * Returns the latitude of the center of a grid cell. - * - * @param gridCellId The grid cell. - * - * @return The latitude value of the cell's center. - */ - public static float getGridCellCenterLat(int gridCellId) { - - int xIndex = gridCellId % NUMBER_OF_GRID_X; - int yIndex = (gridCellId - xIndex) / NUMBER_OF_GRID_X; - - return (float) (LAT_NORTH - (yIndex * DELTA_LAT) - (DELTA_LAT / 2)); - - } - - /** - * Returns a random longitude within the NYC area. - * - * @param rand A random number generator. - * @return A random longitude value within the NYC area. - */ - public static float getRandomNYCLon(Random rand) { - return (float) (LON_EAST - (LON_WIDTH * rand.nextFloat())); - } - - /** - * Returns a random latitude within the NYC area. - * - * @param rand A random number generator. - * @return A random latitude value within the NYC area. - */ - public static float getRandomNYCLat(Random rand) { - return (float) (LAT_SOUTH + (LAT_HEIGHT * rand.nextFloat())); - } - - /** - * Returns the Euclidean distance between two locations specified as lon/lat pairs. - * - * @param lon1 Longitude of first location - * @param lat1 Latitude of first location - * @param lon2 Longitude of second location - * @param lat2 Latitude of second location - * @return The Euclidean distance between the specified locations. - */ - public static double getEuclideanDistance(float lon1, float lat1, float lon2, float lat2) { - double x = lat1 - lat2; - double y = (lon1 - lon2) * Math.cos(lat2); - return (DEG_LEN * Math.sqrt(x * x + y * y)); - } - - /** - * Returns the angle in degrees between the vector from the start to the destination - * and the x-axis on which the start is located. - * - * <p>The angle describes in which direction the destination is located from the start, i.e., - * 0° -> East, 90° -> South, 180° -> West, 270° -> North - * - * @param startLon longitude of start location - * @param startLat latitude of start location - * @param destLon longitude of destination - * @param destLat latitude of destination - * @return The direction from start to destination location - */ - public static int getDirectionAngle( - float startLon, float startLat, float destLon, float destLat) { - - double x = destLat - startLat; - double y = (destLon - startLon) * Math.cos(startLat); - - return (int) Math.toDegrees(Math.atan2(x, y)) + 179; - } - + // geo boundaries of the area of NYC + public static final double LON_EAST = -73.7; + public static final double LON_WEST = -74.05; + public static final double LAT_NORTH = 41.0; + public static final double LAT_SOUTH = 40.5; + + // area width and height + public static final double LON_WIDTH = 74.05 - 73.7; + public static final double LAT_HEIGHT = 41.0 - 40.5; + + // delta step to create artificial grid overlay of NYC + public static final double DELTA_LON = 0.0014; + public static final double DELTA_LAT = 0.00125; + + // ( |LON_WEST| - |LON_EAST| ) / DELTA_LAT + public static final int NUMBER_OF_GRID_X = 250; + // ( LAT_NORTH - LAT_SOUTH ) / DELTA_LON + public static final int NUMBER_OF_GRID_Y = 400; + + public static final float DEG_LEN = 110.25f; + + /** + * Checks if a location specified by longitude and latitude values is within the geo boundaries + * of New York City. + * + * @param lon longitude of the location to check + * @param lat latitude of the location to check + * @return true if the location is within NYC boundaries, otherwise false. + */ + public static boolean isInNYC(float lon, float lat) { + + return !(lon > LON_EAST || lon < LON_WEST) && !(lat > LAT_NORTH || lat < LAT_SOUTH); + } + + /** + * Maps a location specified by latitude and longitude values to a cell of a grid covering the + * area of NYC. The grid cells are roughly 100 x 100 m and sequentially number from north-west + * to south-east starting by zero. + * + * @param lon longitude of the location to map + * @param lat latitude of the location to map + * @return id of mapped grid cell. + */ + public static int mapToGridCell(float lon, float lat) { + int xIndex = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon)) / DELTA_LON); + int yIndex = (int) Math.floor((LAT_NORTH - lat) / DELTA_LAT); + + return xIndex + (yIndex * NUMBER_OF_GRID_X); + } + + /** + * Maps the direct path between two locations specified by longitude and latitude to a list of + * cells of a grid covering the area of NYC. The grid cells are roughly 100 x 100 m and + * sequentially number from north-west to south-east starting by zero. + * + * @param lon1 longitude of the first location + * @param lat1 latitude of the first location + * @param lon2 longitude of the second location + * @param lat2 latitude of the second location + * @return A list of cell ids + */ + public static List<Integer> mapToGridCellsOnWay( + float lon1, float lat1, float lon2, float lat2) { + + int x1 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon1)) / DELTA_LON); + int y1 = (int) Math.floor((LAT_NORTH - lat1) / DELTA_LAT); + + int x2 = (int) Math.floor((Math.abs(LON_WEST) - Math.abs(lon2)) / DELTA_LON); + int y2 = (int) Math.floor((LAT_NORTH - lat2) / DELTA_LAT); + + int startX, startY, endX, endY; + if (x1 <= x2) { + startX = x1; + startY = y1; + endX = x2; + endY = y2; + } else { + startX = x2; + startY = y2; + endX = x1; + endY = y1; + } + + double slope = (endY - startY) / ((endX - startX) + 0.00000001); + + int curX = startX; + int curY = startY; + + ArrayList<Integer> cellIds = new ArrayList<>(64); + cellIds.add(curX + (curY * NUMBER_OF_GRID_X)); + + while (curX < endX || curY != endY) { + + if (slope > 0) { + double y = (curX - startX + 0.5) * slope + startY - 0.5; + + if (y > curY - 0.05 && y < curY + 0.05) { + curX++; + curY++; + } else if (y < curY) { + curX++; + } else { + curY++; + } + } else { + double y = (curX - startX + 0.5) * slope + startY + 0.5; + + if (y > curY - 0.05 && y < curY + 0.05) { + curX++; + curY--; + } + if (y > curY) { + curX++; + } else { + curY--; + } + } + + cellIds.add(curX + (curY * NUMBER_OF_GRID_X)); + } + + return cellIds; + } + + /** + * Returns the longitude of the center of a grid cell. + * + * @param gridCellId The grid cell. + * @return The longitude value of the cell's center. + */ + public static float getGridCellCenterLon(int gridCellId) { + + int xIndex = gridCellId % NUMBER_OF_GRID_X; + + return (float) (Math.abs(LON_WEST) - (xIndex * DELTA_LON) - (DELTA_LON / 2)) * -1.0f; + } + + /** + * Returns the latitude of the center of a grid cell. + * + * @param gridCellId The grid cell. + * @return The latitude value of the cell's center. + */ + public static float getGridCellCenterLat(int gridCellId) { + + int xIndex = gridCellId % NUMBER_OF_GRID_X; + int yIndex = (gridCellId - xIndex) / NUMBER_OF_GRID_X; + + return (float) (LAT_NORTH - (yIndex * DELTA_LAT) - (DELTA_LAT / 2)); + } + + /** + * Returns a random longitude within the NYC area. + * + * @param rand A random number generator. + * @return A random longitude value within the NYC area. + */ + public static float getRandomNYCLon(Random rand) { + return (float) (LON_EAST - (LON_WIDTH * rand.nextFloat())); + } + + /** + * Returns a random latitude within the NYC area. + * + * @param rand A random number generator. + * @return A random latitude value within the NYC area. + */ + public static float getRandomNYCLat(Random rand) { + return (float) (LAT_SOUTH + (LAT_HEIGHT * rand.nextFloat())); + } + + /** + * Returns the Euclidean distance between two locations specified as lon/lat pairs. + * + * @param lon1 Longitude of first location + * @param lat1 Latitude of first location + * @param lon2 Longitude of second location + * @param lat2 Latitude of second location + * @return The Euclidean distance between the specified locations. + */ + public static double getEuclideanDistance(float lon1, float lat1, float lon2, float lat2) { + double x = lat1 - lat2; + double y = (lon1 - lon2) * Math.cos(lat2); + return (DEG_LEN * Math.sqrt(x * x + y * y)); + } + + /** + * Returns the angle in degrees between the vector from the start to the destination and the + * x-axis on which the start is located. + * + * <p>The angle describes in which direction the destination is located from the start, i.e., 0° + * -> East, 90° -> South, 180° -> West, 270° -> North + * + * @param startLon longitude of start location + * @param startLat latitude of start location + * @param destLon longitude of destination + * @param destLat latitude of destination + * @return The direction from start to destination location + */ + public static int getDirectionAngle( + float startLon, float startLat, float destLon, float destLat) { + + double x = destLat - startLat; + double y = (destLon - startLon) * Math.cos(startLat); + + return (int) Math.toDegrees(Math.atan2(x, y)) + 179; + } } diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java index 4f6c95f..8585ae0 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/MissingSolutionException.java @@ -18,12 +18,8 @@ package org.apache.flink.training.exercises.common.utils; -/** - * Exception denoting a missing solution (results in tests verifying the solution instead). - */ +/** Exception denoting a missing solution (results in tests verifying the solution instead). */ public class MissingSolutionException extends Exception { - /** - * Create new exception. - */ - public MissingSolutionException() {} + /** Create new exception. */ + public MissingSolutionException() {} } diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java index f4f5596..0cadcce 100644 --- a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java +++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java @@ -30,139 +30,160 @@ import java.util.ArrayList; import java.util.List; public abstract class TaxiRideTestBase<OUT> { - public static class TestRideSource extends TestSource<TaxiRide> implements ResultTypeQueryable<TaxiRide> { - public TestRideSource(Object ... eventsOrWatermarks) { - this.testStream = eventsOrWatermarks; - } - - @Override - long getTimestamp(TaxiRide ride) { - return ride.getEventTime(); - } - - @Override - public TypeInformation<TaxiRide> getProducedType() { - return TypeInformation.of(TaxiRide.class); - } - } - - public static class TestFareSource extends TestSource<TaxiFare> implements ResultTypeQueryable<TaxiFare> { - public TestFareSource(Object ... eventsOrWatermarks) { - this.testStream = eventsOrWatermarks; - } - - @Override - long getTimestamp(TaxiFare fare) { - return fare.getEventTime(); - } - - @Override - public TypeInformation<TaxiFare> getProducedType() { - return TypeInformation.of(TaxiFare.class); - } - } - - public static class TestStringSource extends TestSource<String> implements ResultTypeQueryable<String> { - public TestStringSource(Object ... eventsOrWatermarks) { - this.testStream = eventsOrWatermarks; - } - - @Override - long getTimestamp(String s) { - return 0L; - } - - @Override - public TypeInformation<String> getProducedType() { - return TypeInformation.of(String.class); - } - } - - public static class TestSink<OUT> implements SinkFunction<OUT> { - - // must be static - public static final List VALUES = new ArrayList<>(); - - @Override - public void invoke(OUT value, Context context) { - VALUES.add(value); - } - } - - public interface Testable { - void main() throws Exception; - } - - protected List<OUT> runApp(TestRideSource source, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception { - ExerciseBase.rides = source; - - return execute(sink, exercise, solution); - } - - protected List<OUT> runApp(TestFareSource source, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception { - ExerciseBase.fares = source; - - return execute(sink, exercise, solution); - } - - protected List<OUT> runApp(TestRideSource rides, TestFareSource fares, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception { - ExerciseBase.rides = rides; - ExerciseBase.fares = fares; - - return execute(sink, exercise, solution); - } - - protected List<OUT> runApp(TestRideSource rides, TestSink<OUT> sink, Testable solution) throws Exception { - ExerciseBase.rides = rides; - - return execute(sink, solution); - } - - protected List<OUT> runApp(TestRideSource rides, TestStringSource strings, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception { - ExerciseBase.rides = rides; - ExerciseBase.strings = strings; - - return execute(sink, exercise, solution); - } - - private List<OUT> execute(TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception { - sink.VALUES.clear(); - - ExerciseBase.out = sink; - ExerciseBase.parallelism = 1; - - try { - exercise.main(); - } catch (Exception e) { - if (ultimateCauseIsMissingSolution(e)) { - sink.VALUES.clear(); - solution.main(); - } else { - throw e; - } - } - - return sink.VALUES; - } - - private List<OUT> execute(TestSink<OUT> sink, Testable solution) throws Exception { - sink.VALUES.clear(); - - ExerciseBase.out = sink; - ExerciseBase.parallelism = 1; - - solution.main(); - - return sink.VALUES; - } - - private boolean ultimateCauseIsMissingSolution(Throwable e) { - if (e instanceof MissingSolutionException) { - return true; - } else if (e.getCause() != null) { - return ultimateCauseIsMissingSolution(e.getCause()); - } else { - return false; - } - } + public static class TestRideSource extends TestSource<TaxiRide> + implements ResultTypeQueryable<TaxiRide> { + public TestRideSource(Object... eventsOrWatermarks) { + this.testStream = eventsOrWatermarks; + } + + @Override + long getTimestamp(TaxiRide ride) { + return ride.getEventTime(); + } + + @Override + public TypeInformation<TaxiRide> getProducedType() { + return TypeInformation.of(TaxiRide.class); + } + } + + public static class TestFareSource extends TestSource<TaxiFare> + implements ResultTypeQueryable<TaxiFare> { + public TestFareSource(Object... eventsOrWatermarks) { + this.testStream = eventsOrWatermarks; + } + + @Override + long getTimestamp(TaxiFare fare) { + return fare.getEventTime(); + } + + @Override + public TypeInformation<TaxiFare> getProducedType() { + return TypeInformation.of(TaxiFare.class); + } + } + + public static class TestStringSource extends TestSource<String> + implements ResultTypeQueryable<String> { + public TestStringSource(Object... eventsOrWatermarks) { + this.testStream = eventsOrWatermarks; + } + + @Override + long getTimestamp(String s) { + return 0L; + } + + @Override + public TypeInformation<String> getProducedType() { + return TypeInformation.of(String.class); + } + } + + public static class TestSink<OUT> implements SinkFunction<OUT> { + + // must be static + public static final List VALUES = new ArrayList<>(); + + @Override + public void invoke(OUT value, Context context) { + VALUES.add(value); + } + } + + public interface Testable { + void main() throws Exception; + } + + protected List<OUT> runApp( + TestRideSource source, TestSink<OUT> sink, Testable exercise, Testable solution) + throws Exception { + ExerciseBase.rides = source; + + return execute(sink, exercise, solution); + } + + protected List<OUT> runApp( + TestFareSource source, TestSink<OUT> sink, Testable exercise, Testable solution) + throws Exception { + ExerciseBase.fares = source; + + return execute(sink, exercise, solution); + } + + protected List<OUT> runApp( + TestRideSource rides, + TestFareSource fares, + TestSink<OUT> sink, + Testable exercise, + Testable solution) + throws Exception { + ExerciseBase.rides = rides; + ExerciseBase.fares = fares; + + return execute(sink, exercise, solution); + } + + protected List<OUT> runApp(TestRideSource rides, TestSink<OUT> sink, Testable solution) + throws Exception { + ExerciseBase.rides = rides; + + return execute(sink, solution); + } + + protected List<OUT> runApp( + TestRideSource rides, + TestStringSource strings, + TestSink<OUT> sink, + Testable exercise, + Testable solution) + throws Exception { + ExerciseBase.rides = rides; + ExerciseBase.strings = strings; + + return execute(sink, exercise, solution); + } + + private List<OUT> execute(TestSink<OUT> sink, Testable exercise, Testable solution) + throws Exception { + sink.VALUES.clear(); + + ExerciseBase.out = sink; + ExerciseBase.parallelism = 1; + + try { + exercise.main(); + } catch (Exception e) { + if (ultimateCauseIsMissingSolution(e)) { + sink.VALUES.clear(); + solution.main(); + } else { + throw e; + } + } + + return sink.VALUES; + } + + private List<OUT> execute(TestSink<OUT> sink, Testable solution) throws Exception { + sink.VALUES.clear(); + + ExerciseBase.out = sink; + ExerciseBase.parallelism = 1; + + solution.main(); + + return sink.VALUES; + } + + private boolean ultimateCauseIsMissingSolution(Throwable e) { + if (e instanceof MissingSolutionException) { + return true; + } else if (e.getCause() != null) { + return ultimateCauseIsMissingSolution(e.getCause()); + } else { + return false; + } + } } diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java index 84ba53e..593112e 100644 --- a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java +++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java @@ -22,29 +22,29 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; public abstract class TestSource<T> implements SourceFunction<T> { - private volatile boolean running = true; - // T or watermark (Long) - protected Object[] testStream; + private volatile boolean running = true; + // T or watermark (Long) + protected Object[] testStream; - @Override - public void run(SourceContext<T> ctx) { - for (int i = 0; (i < testStream.length) && running; i++) { - if (testStream[i] instanceof Long) { - Long ts = (Long) testStream[i]; - ctx.emitWatermark(new Watermark(ts)); - } else { - //noinspection unchecked - T element = (T) testStream[i]; - ctx.collectWithTimestamp(element, getTimestamp(element)); - } - } - // test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes - } + @Override + public void run(SourceContext<T> ctx) { + for (int i = 0; (i < testStream.length) && running; i++) { + if (testStream[i] instanceof Long) { + Long ts = (Long) testStream[i]; + ctx.emitWatermark(new Watermark(ts)); + } else { + //noinspection unchecked + T element = (T) testStream[i]; + ctx.collectWithTimestamp(element, getTimestamp(element)); + } + } + // test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes + } - abstract long getTimestamp(T element); + abstract long getTimestamp(T element); - @Override - public void cancel() { - running = false; - } + @Override + public void cancel() { + running = false; + } } diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md index 888a015..56a55d4 100644 --- a/hourly-tips/DISCUSSION.md +++ b/hourly-tips/DISCUSSION.md @@ -25,28 +25,28 @@ The Java and Scala reference solutions illustrate two different approaches, thou ```java DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares - .keyBy((TaxiFare fare) -> fare.driverId) - .window(TumblingEventTimeWindows.of(Time.hours(1))) - .process(new AddTips()); + .keyBy((TaxiFare fare) -> fare.driverId) + .window(TumblingEventTimeWindows.of(Time.hours(1))) + .process(new AddTips()); ``` where a `ProcessWindowFunction` does all the heavy lifting: ```java public static class AddTips extends ProcessWindowFunction< - TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { - @Override - public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception { - Float sumOfTips = 0F; - for (TaxiFare f : fares) { - sumOfTips += f.tip; - } - out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); - } + TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { + @Override + public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception { + Float sumOfTips = 0F; + for (TaxiFare f : fares) { + sumOfTips += f.tip; + } + out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); + } } ``` -This is straightforward, but has the drawback that it is buffering all of the `TaxiFare` objects in the windows until the windows are triggered, which is less efficient than computing the sum of the tips incrementally, using a `reduce` or `agggregate` function. +This is straightforward, but has the drawback that it is buffering all of the `TaxiFare` objects in the windows until the windows are triggered, which is less efficient than computing the sum of the tips incrementally, using a `reduce` or `agggregate` function. The [Scala solution](src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala) uses a `reduce` function @@ -95,8 +95,8 @@ Now, how to find the maximum within each hour? The reference solutions both do t ```java DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips - .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) - .maxBy(2); + .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) + .maxBy(2); ``` which works just fine, producing this stream of results: @@ -114,8 +114,8 @@ But, what if we were to do this, instead? ```java DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips - .keyBy(t -> t.f0) - .maxBy(2); + .keyBy(t -> t.f0) + .maxBy(2); ``` This says to group the stream of `hourlyTips` by timestamp, and within each timestamp, find the maximum of the sum of the tips. diff --git a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java index b453c65..fddbc80 100644 --- a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java +++ b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java @@ -28,32 +28,30 @@ import org.apache.flink.training.exercises.common.utils.MissingSolutionException /** * The "Hourly Tips" exercise of the Flink training in the docs. * - * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and - * then from that stream, find the highest tip total in each hour. - * + * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour + * by hour, and then from that stream, find the highest tip total in each hour. */ public class HourlyTipsExercise extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { - - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // start the data generator - DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - throw new MissingSolutionException(); + // start the data generator + DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); -// printOrTest(hourlyMax); + throw new MissingSolutionException(); - // execute the transformation pipeline -// env.execute("Hourly Tips (java)"); - } + // printOrTest(hourlyMax); + // execute the transformation pipeline + // env.execute("Hourly Tips (java)"); + } } diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java index a6c6279..9c9ee87 100644 --- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java +++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java @@ -33,62 +33,64 @@ import org.apache.flink.util.Collector; /** * Java reference implementation for the "Hourly Tips" exercise of the Flink training in the docs. * - * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and - * then from that stream, find the highest tip total in each hour. - * + * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour + * by hour, and then from that stream, find the highest tip total in each hour. */ public class HourlyTipsSolution extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - // start the data generator - DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); + // start the data generator + DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator())); - // compute tips per hour for each driver - DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares - .keyBy((TaxiFare fare) -> fare.driverId) - .window(TumblingEventTimeWindows.of(Time.hours(1))) - .process(new AddTips()); + // compute tips per hour for each driver + DataStream<Tuple3<Long, Long, Float>> hourlyTips = + fares.keyBy((TaxiFare fare) -> fare.driverId) + .window(TumblingEventTimeWindows.of(Time.hours(1))) + .process(new AddTips()); - DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips - .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) - .maxBy(2); + DataStream<Tuple3<Long, Long, Float>> hourlyMax = + hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2); -// You should explore how this alternative behaves. In what ways is the same as, -// and different from, the solution above (using a windowAll)? + // You should explore how this alternative behaves. In what ways is the same as, + // and different from, the solution above (using a windowAll)? -// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips -// .keyBy(t -> t.f0) -// .maxBy(2); + // DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips + // .keyBy(t -> t.f0) + // .maxBy(2); - printOrTest(hourlyMax); + printOrTest(hourlyMax); - // execute the transformation pipeline - env.execute("Hourly Tips (java)"); - } + // execute the transformation pipeline + env.execute("Hourly Tips (java)"); + } - /* - * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. - */ - public static class AddTips extends ProcessWindowFunction< - TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { + /* + * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. + */ + public static class AddTips + extends ProcessWindowFunction<TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> { - @Override - public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) { - float sumOfTips = 0F; - for (TaxiFare f : fares) { - sumOfTips += f.tip; - } - out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); - } - } + @Override + public void process( + Long key, + Context context, + Iterable<TaxiFare> fares, + Collector<Tuple3<Long, Long, Float>> out) { + float sumOfTips = 0F; + for (TaxiFare f : fares) { + sumOfTips += f.tip; + } + out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips)); + } + } } diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java index 2924a21..24262a6 100644 --- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java +++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java @@ -34,76 +34,65 @@ import static org.junit.Assert.assertEquals; public class HourlyTipsTest extends TaxiRideTestBase<Tuple3<Long, Long, Float>> { - static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new String[]{}); + static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new String[] {}); - @Test - public void testOneDriverOneTip() throws Exception { - TaxiFare one = testFare(1, t(0), 1.0F); + @Test + public void testOneDriverOneTip() throws Exception { + TaxiFare one = testFare(1, t(0), 1.0F); - TestFareSource source = new TestFareSource( - one - ); + TestFareSource source = new TestFareSource(one); - Tuple3<Long, Long, Float> max = Tuple3.of(t(60).toEpochMilli(), 1L, 1.0F); + Tuple3<Long, Long, Float> max = Tuple3.of(t(60).toEpochMilli(), 1L, 1.0F); - List<Tuple3<Long, Long, Float>> expected = Collections.singletonList(max); + List<Tuple3<Long, Long, Float>> expected = Collections.singletonList(max); - assertEquals(expected, results(source)); - } + assertEquals(expected, results(source)); + } - @Test - public void testTipsAreSummedByHour() throws Exception { - TaxiFare oneIn1 = testFare(1, t(0), 1.0F); - TaxiFare fiveIn1 = testFare(1, t(15), 5.0F); - TaxiFare tenIn2 = testFare(1, t(90), 10.0F); + @Test + public void testTipsAreSummedByHour() throws Exception { + TaxiFare oneIn1 = testFare(1, t(0), 1.0F); + TaxiFare fiveIn1 = testFare(1, t(15), 5.0F); + TaxiFare tenIn2 = testFare(1, t(90), 10.0F); - TestFareSource source = new TestFareSource( - oneIn1, - fiveIn1, - tenIn2 - ); + TestFareSource source = new TestFareSource(oneIn1, fiveIn1, tenIn2); - Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F); - Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 1L, 10.0F); + Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F); + Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 1L, 10.0F); - List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2); + List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2); - assertEquals(expected, results(source)); - } + assertEquals(expected, results(source)); + } - @Test - public void testMaxAcrossDrivers() throws Exception { - TaxiFare oneFor1In1 = testFare(1, t(0), 1.0F); - TaxiFare fiveFor1In1 = testFare(1, t(15), 5.0F); - TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F); - TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F); + @Test + public void testMaxAcrossDrivers() throws Exception { + TaxiFare oneFor1In1 = testFare(1, t(0), 1.0F); + TaxiFare fiveFor1In1 = testFare(1, t(15), 5.0F); + TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F); + TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F); - TestFareSource source = new TestFareSource( - oneFor1In1, - fiveFor1In1, - tenFor1In2, - twentyFor2In2 - ); + TestFareSource source = + new TestFareSource(oneFor1In1, fiveFor1In1, tenFor1In2, twentyFor2In2); - Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F); - Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L, 20.0F); + Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F); + Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L, 20.0F); - List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2); + List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2); - assertEquals(expected, results(source)); - } + assertEquals(expected, results(source)); + } - private Instant t(int minutes) { - return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 * minutes); - } + private Instant t(int minutes) { + return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 * minutes); + } - private TaxiFare testFare(long driverId, Instant startTime, float tip) { - return new TaxiFare(0, 0, driverId, startTime, "", tip, 0F, 0F); - } - - protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) throws Exception { - Testable javaSolution = () -> HourlyTipsSolution.main(new String[]{}); - return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); - } + private TaxiFare testFare(long driverId, Instant startTime, float tip) { + return new TaxiFare(0, 0, driverId, startTime, "", tip, 0F, 0F); + } + protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) throws Exception { + Testable javaSolution = () -> HourlyTipsSolution.main(new String[] {}); + return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); + } } diff --git a/long-ride-alerts/DISCUSSION.md b/long-ride-alerts/DISCUSSION.md index f931ffa..571094b 100644 --- a/long-ride-alerts/DISCUSSION.md +++ b/long-ride-alerts/DISCUSSION.md @@ -25,7 +25,7 @@ It would be interesting to test that the solution does not leak state. A good way to write unit tests for a `KeyedProcessFunction` to check for state retention, etc., is to use the test harnesses described in the -[documentation on testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators). +[documentation on testing](https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators). In fact, the reference solutions will leak state in the case where a START event is missing. They also leak in the case where the alert is generated, but then the END event does eventually arrive (after `onTimer()` diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md index a5d2ac7..3366fbb 100644 --- a/long-ride-alerts/README.md +++ b/long-ride-alerts/README.md @@ -22,7 +22,7 @@ under the License. The goal of the "Long Ride Alerts" exercise is to provide a real-time warning whenever a taxi ride started two hours ago, and is still ongoing. -This should be done using the event time timestamps and watermarks that are provided in the data stream. +This should be done using the event time timestamps and watermarks that are provided in the data stream. The stream is out-of-order, and it is possible that the END event for a ride will be processed before its START event. But in such cases, we never care to create an alert, since we do know that the ride @@ -62,10 +62,10 @@ The resulting stream should be printed to standard out. <details> <summary><strong>Overall approach</strong></summary> -This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers, -and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which can happen). +This exercise revolves around using a `ProcessFunction` to manage some keyed state and event time timers, +and doing so in a way that works even when the END event for a given `rideId` arrives before the START (which can happen). The challenge is figuring out what state to keep, and when to set and clear that state. -You will want to use event time timers that fire two hours after an incoming START event, and in the `onTimer()` method, +You will want to use event time timers that fire two hours after an incoming START event, and in the `onTimer()` method, collect START events to the output only if a matching END event hasn't yet arrived. </details> diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java index e42ff37..4fa7c9a 100644 --- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java +++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java @@ -34,47 +34,46 @@ import org.apache.flink.util.Collector; * * <p>The goal for this exercise is to emit START events for taxi rides that have not been matched * by an END event during the first 2 hours of the ride. - * */ public class LongRidesExercise extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - // start the data generator - DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); + // start the data generator + DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); - DataStream<TaxiRide> longRides = rides - .keyBy((TaxiRide ride) -> ride.rideId) - .process(new MatchFunction()); + DataStream<TaxiRide> longRides = + rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction()); - printOrTest(longRides); + printOrTest(longRides); - env.execute("Long Taxi Rides"); - } + env.execute("Long Taxi Rides"); + } - public static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> { + public static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> { - @Override - public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); - } + @Override + public void open(Configuration config) throws Exception { + throw new MissingSolutionException(); + } - @Override - public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception { - TimerService timerService = context.timerService(); - } + @Override + public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) + throws Exception { + TimerService timerService = context.timerService(); + } - @Override - public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception { - } - } + @Override + public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) + throws Exception {} + } } diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java index a846261..5e7536d 100644 --- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java +++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java @@ -34,75 +34,75 @@ import org.apache.flink.util.Collector; * * <p>The goal for this exercise is to emit START events for taxi rides that have not been matched * by an END event during the first 2 hours of the ride. - * */ public class LongRidesSolution extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { - - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); - - // start the data generator - DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); - - DataStream<TaxiRide> longRides = rides - .keyBy((TaxiRide ride) -> ride.rideId) - .process(new MatchFunction()); - - printOrTest(longRides); - - env.execute("Long Taxi Rides"); - } - - private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> { - - private ValueState<TaxiRide> rideState; - - @Override - public void open(Configuration config) { - ValueStateDescriptor<TaxiRide> stateDescriptor = - new ValueStateDescriptor<>("ride event", TaxiRide.class); - rideState = getRuntimeContext().getState(stateDescriptor); - } - - @Override - public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) throws Exception { - TaxiRide previousRideEvent = rideState.value(); - - if (previousRideEvent == null) { - rideState.update(ride); - if (ride.isStart) { - context.timerService().registerEventTimeTimer(getTimerTime(ride)); - } - } else { - if (!ride.isStart) { - // it's an END event, so event saved was the START event and has a timer - // the timer hasn't fired yet, and we can safely kill the timer - context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent)); - } - // both events have now been seen, we can clear the state - rideState.clear(); - } - } - - @Override - public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) throws Exception { - - // if we get here, we know that the ride started two hours ago, and the END hasn't been processed - out.collect(rideState.value()); - rideState.clear(); - } - - private long getTimerTime(TaxiRide ride) { - return ride.startTime.plusSeconds(120 * 60).toEpochMilli(); - } - } - + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { + + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); + + // start the data generator + DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); + + DataStream<TaxiRide> longRides = + rides.keyBy((TaxiRide ride) -> ride.rideId).process(new MatchFunction()); + + printOrTest(longRides); + + env.execute("Long Taxi Rides"); + } + + private static class MatchFunction extends KeyedProcessFunction<Long, TaxiRide, TaxiRide> { + + private ValueState<TaxiRide> rideState; + + @Override + public void open(Configuration config) { + ValueStateDescriptor<TaxiRide> stateDescriptor = + new ValueStateDescriptor<>("ride event", TaxiRide.class); + rideState = getRuntimeContext().getState(stateDescriptor); + } + + @Override + public void processElement(TaxiRide ride, Context context, Collector<TaxiRide> out) + throws Exception { + TaxiRide previousRideEvent = rideState.value(); + + if (previousRideEvent == null) { + rideState.update(ride); + if (ride.isStart) { + context.timerService().registerEventTimeTimer(getTimerTime(ride)); + } + } else { + if (!ride.isStart) { + // it's an END event, so event saved was the START event and has a timer + // the timer hasn't fired yet, and we can safely kill the timer + context.timerService().deleteEventTimeTimer(getTimerTime(previousRideEvent)); + } + // both events have now been seen, we can clear the state + rideState.clear(); + } + } + + @Override + public void onTimer(long timestamp, OnTimerContext context, Collector<TaxiRide> out) + throws Exception { + + // if we get here, we know that the ride started two hours ago, and the END hasn't been + // processed + out.collect(rideState.value()); + rideState.clear(); + } + + private long getTimerTime(TaxiRide ride) { + return ride.startTime.plusSeconds(120 * 60).toEpochMilli(); + } + } } diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java index 7d5a24e..d563e90 100644 --- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java +++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java @@ -32,87 +32,99 @@ import static org.junit.Assert.assertEquals; public class LongRidesTest extends TaxiRideTestBase<TaxiRide> { - static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new String[]{}); - - private static final Instant BEGINNING = Instant.parse("2020-01-01T12:00:00.00Z"); - - @Test - public void shortRide() throws Exception { - Instant oneMinLater = BEGINNING.plusSeconds(60); - TaxiRide rideStarted = startRide(1, BEGINNING); - TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); - Long markOneMinLater = oneMinLater.toEpochMilli(); - - TestRideSource source = new TestRideSource(rideStarted, endedOneMinLater, markOneMinLater); - assert(results(source).isEmpty()); - } - - @Test - public void outOfOrder() throws Exception { - Instant oneMinLater = BEGINNING.plusSeconds(60); - TaxiRide rideStarted = startRide(1, BEGINNING); - TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); - Long markOneMinLater = oneMinLater.toEpochMilli(); - - TestRideSource source = new TestRideSource(endedOneMinLater, rideStarted, markOneMinLater); - assert(results(source).isEmpty()); - } - - @Test - public void noStartShort() throws Exception { - Instant oneMinLater = BEGINNING.plusSeconds(60); - TaxiRide rideStarted = startRide(1, BEGINNING); - TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); - Long markOneMinLater = oneMinLater.toEpochMilli(); - - TestRideSource source = new TestRideSource(endedOneMinLater, markOneMinLater); - assert(results(source).isEmpty()); - } - - @Test - public void noEnd() throws Exception { - TaxiRide rideStarted = startRide(1, BEGINNING); - Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 60).toEpochMilli(); - - TestRideSource source = new TestRideSource(rideStarted, markThreeHoursLater); - assertEquals(Collections.singletonList(rideStarted), results(source)); - } - - @Test - public void longRide() throws Exception { - TaxiRide rideStarted = startRide(1, BEGINNING); - Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli(); - TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusSeconds(180 * 60)); - - TestRideSource source = new TestRideSource(rideStarted, mark2HoursLater, rideEnded3HoursLater); - assertEquals(Collections.singletonList(rideStarted), results(source)); - } - - @Test - public void startIsDelayedMoreThanTwoHours() throws Exception { - TaxiRide rideStarted = startRide(1, BEGINNING); - TaxiRide rideEndedAfter1Hour = endRide(rideStarted, BEGINNING.plusSeconds(60 * 60)); - Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 60).toEpochMilli(); - - TestRideSource source = new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, rideStarted); - assert(results(source).isEmpty()); - } - - private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, Instant endTime) { - return new TaxiRide(rideId, isStart, startTime, endTime, -73.9947F, 40.750626F, -73.9947F, 40.750626F, (short) 1, 0, 0); - } - - private TaxiRide startRide(long rideId, Instant startTime) { - return testRide(rideId, true, startTime, Instant.EPOCH); - } - - private TaxiRide endRide(TaxiRide started, Instant endTime) { - return testRide(started.rideId, false, started.startTime, endTime); - } - - protected List<TaxiRide> results(TestRideSource source) throws Exception { - Testable javaSolution = () -> LongRidesSolution.main(new String[]{}); - return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); - } - + static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new String[] {}); + + private static final Instant BEGINNING = Instant.parse("2020-01-01T12:00:00.00Z"); + + @Test + public void shortRide() throws Exception { + Instant oneMinLater = BEGINNING.plusSeconds(60); + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); + Long markOneMinLater = oneMinLater.toEpochMilli(); + + TestRideSource source = new TestRideSource(rideStarted, endedOneMinLater, markOneMinLater); + assert (results(source).isEmpty()); + } + + @Test + public void outOfOrder() throws Exception { + Instant oneMinLater = BEGINNING.plusSeconds(60); + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); + Long markOneMinLater = oneMinLater.toEpochMilli(); + + TestRideSource source = new TestRideSource(endedOneMinLater, rideStarted, markOneMinLater); + assert (results(source).isEmpty()); + } + + @Test + public void noStartShort() throws Exception { + Instant oneMinLater = BEGINNING.plusSeconds(60); + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater); + Long markOneMinLater = oneMinLater.toEpochMilli(); + + TestRideSource source = new TestRideSource(endedOneMinLater, markOneMinLater); + assert (results(source).isEmpty()); + } + + @Test + public void noEnd() throws Exception { + TaxiRide rideStarted = startRide(1, BEGINNING); + Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 60).toEpochMilli(); + + TestRideSource source = new TestRideSource(rideStarted, markThreeHoursLater); + assertEquals(Collections.singletonList(rideStarted), results(source)); + } + + @Test + public void longRide() throws Exception { + TaxiRide rideStarted = startRide(1, BEGINNING); + Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli(); + TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusSeconds(180 * 60)); + + TestRideSource source = + new TestRideSource(rideStarted, mark2HoursLater, rideEnded3HoursLater); + assertEquals(Collections.singletonList(rideStarted), results(source)); + } + + @Test + public void startIsDelayedMoreThanTwoHours() throws Exception { + TaxiRide rideStarted = startRide(1, BEGINNING); + TaxiRide rideEndedAfter1Hour = endRide(rideStarted, BEGINNING.plusSeconds(60 * 60)); + Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 * 60).toEpochMilli(); + + TestRideSource source = + new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd, rideStarted); + assert (results(source).isEmpty()); + } + + private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, Instant endTime) { + return new TaxiRide( + rideId, + isStart, + startTime, + endTime, + -73.9947F, + 40.750626F, + -73.9947F, + 40.750626F, + (short) 1, + 0, + 0); + } + + private TaxiRide startRide(long rideId, Instant startTime) { + return testRide(rideId, true, startTime, Instant.EPOCH); + } + + private TaxiRide endRide(TaxiRide started, Instant endTime) { + return testRide(started.rideId, false, started.startTime, endTime); + } + + protected List<TaxiRide> results(TestRideSource source) throws Exception { + Testable javaSolution = () -> LongRidesSolution.main(new String[] {}); + return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); + } } diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java index 20467b4..24d6648 100644 --- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java +++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java @@ -29,43 +29,42 @@ import org.apache.flink.training.exercises.common.utils.MissingSolutionException /** * The "Ride Cleansing" exercise from the Flink training in the docs. * - * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides that - * start and end within New York City. The resulting stream should be printed. - * + * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides + * that start and end within New York City. The resulting stream should be printed. */ public class RideCleansingExercise extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { - - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // start the data generator - DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - DataStream<TaxiRide> filteredRides = rides - // filter out rides that do not start or stop in NYC - .filter(new NYCFilter()); + // start the data generator + DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); - // print the filtered stream - printOrTest(filteredRides); + DataStream<TaxiRide> filteredRides = + rides + // filter out rides that do not start or stop in NYC + .filter(new NYCFilter()); - // run the cleansing pipeline - env.execute("Taxi Ride Cleansing"); - } + // print the filtered stream + printOrTest(filteredRides); - private static class NYCFilter implements FilterFunction<TaxiRide> { + // run the cleansing pipeline + env.execute("Taxi Ride Cleansing"); + } - @Override - public boolean filter(TaxiRide taxiRide) throws Exception { - throw new MissingSolutionException(); - } - } + private static class NYCFilter implements FilterFunction<TaxiRide> { + @Override + public boolean filter(TaxiRide taxiRide) throws Exception { + throw new MissingSolutionException(); + } + } } diff --git a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java index 1d7af4c..690e6c7 100644 --- a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java +++ b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java @@ -29,42 +29,42 @@ import org.apache.flink.training.exercises.common.utils.GeoUtils; /** * Solution to the "Ride Cleansing" exercise of the Flink training in the docs. * - * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides that - * start and end within New York City. The resulting stream should be printed. - * + * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides + * that start and end within New York City. The resulting stream should be printed. */ public class RideCleansingSolution extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - // start the data generator - DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); + // start the data generator + DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator())); - DataStream<TaxiRide> filteredRides = rides - // keep only those rides and both start and end in NYC - .filter(new NYCFilter()); + DataStream<TaxiRide> filteredRides = + rides + // keep only those rides and both start and end in NYC + .filter(new NYCFilter()); - // print the filtered stream - printOrTest(filteredRides); + // print the filtered stream + printOrTest(filteredRides); - // run the cleansing pipeline - env.execute("Taxi Ride Cleansing"); - } + // run the cleansing pipeline + env.execute("Taxi Ride Cleansing"); + } - public static class NYCFilter implements FilterFunction<TaxiRide> { - @Override - public boolean filter(TaxiRide taxiRide) { - return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) && - GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); - } - } + public static class NYCFilter implements FilterFunction<TaxiRide> { + @Override + public boolean filter(TaxiRide taxiRide) { + return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) + && GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat); + } + } } diff --git a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java index e19f4f3..e3dda36 100644 --- a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java +++ b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java @@ -32,36 +32,45 @@ import static org.junit.Assert.assertEquals; public class RideCleansingTest extends TaxiRideTestBase<TaxiRide> { - static final Testable JAVA_EXERCISE = () -> RideCleansingExercise.main(new String[]{}); - - @Test - public void testInNYC() throws Exception { - TaxiRide atPennStation = testRide(-73.9947F, 40.750626F, -73.9947F, 40.750626F); - - TestRideSource source = new TestRideSource(atPennStation); - - assertEquals(Collections.singletonList(atPennStation), results(source)); - } - - @Test - public void testNotInNYC() throws Exception { - TaxiRide toThePole = testRide(-73.9947F, 40.750626F, 0, 90); - TaxiRide fromThePole = testRide(0, 90, -73.9947F, 40.750626F); - TaxiRide atNorthPole = testRide(0, 90, 0, 90); - - TestRideSource source = new TestRideSource(toThePole, fromThePole, atNorthPole); - - assertEquals(Collections.emptyList(), results(source)); - } - - private TaxiRide testRide(float startLon, float startLat, float endLon, float endLat) { - return new TaxiRide(1L, true, Instant.EPOCH, Instant.EPOCH, - startLon, startLat, endLon, endLat, (short) 1, 0, 0); - } - - protected List<?> results(TestRideSource source) throws Exception { - Testable javaSolution = () -> RideCleansingSolution.main(new String[]{}); - return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); - } - + static final Testable JAVA_EXERCISE = () -> RideCleansingExercise.main(new String[] {}); + + @Test + public void testInNYC() throws Exception { + TaxiRide atPennStation = testRide(-73.9947F, 40.750626F, -73.9947F, 40.750626F); + + TestRideSource source = new TestRideSource(atPennStation); + + assertEquals(Collections.singletonList(atPennStation), results(source)); + } + + @Test + public void testNotInNYC() throws Exception { + TaxiRide toThePole = testRide(-73.9947F, 40.750626F, 0, 90); + TaxiRide fromThePole = testRide(0, 90, -73.9947F, 40.750626F); + TaxiRide atNorthPole = testRide(0, 90, 0, 90); + + TestRideSource source = new TestRideSource(toThePole, fromThePole, atNorthPole); + + assertEquals(Collections.emptyList(), results(source)); + } + + private TaxiRide testRide(float startLon, float startLat, float endLon, float endLat) { + return new TaxiRide( + 1L, + true, + Instant.EPOCH, + Instant.EPOCH, + startLon, + startLat, + endLon, + endLat, + (short) 1, + 0, + 0); + } + + protected List<?> results(TestRideSource source) throws Exception { + Testable javaSolution = () -> RideCleansingSolution.main(new String[] {}); + return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution); + } } diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java index 64175d2..7cdd9c0 100644 --- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java +++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java @@ -35,52 +35,51 @@ import org.apache.flink.util.Collector; * The "Stateful Enrichment" exercise of the Flink training in the docs. * * <p>The goal for this exercise is to enrich TaxiRides with fare information. - * */ public class RidesAndFaresExercise extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { - // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(ExerciseBase.parallelism); + // set up streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(ExerciseBase.parallelism); - DataStream<TaxiRide> rides = env - .addSource(rideSourceOrTest(new TaxiRideGenerator())) - .filter((TaxiRide ride) -> ride.isStart) - .keyBy((TaxiRide ride) -> ride.rideId); + DataStream<TaxiRide> rides = + env.addSource(rideSourceOrTest(new TaxiRideGenerator())) + .filter((TaxiRide ride) -> ride.isStart) + .keyBy((TaxiRide ride) -> ride.rideId); - DataStream<TaxiFare> fares = env - .addSource(fareSourceOrTest(new TaxiFareGenerator())) - .keyBy((TaxiFare fare) -> fare.rideId); + DataStream<TaxiFare> fares = + env.addSource(fareSourceOrTest(new TaxiFareGenerator())) + .keyBy((TaxiFare fare) -> fare.rideId); - DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides - .connect(fares) - .flatMap(new EnrichmentFunction()); + DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = + rides.connect(fares).flatMap(new EnrichmentFunction()); - printOrTest(enrichedRides); + printOrTest(enrichedRides); - env.execute("Join Rides with Fares (java RichCoFlatMap)"); - } + env.execute("Join Rides with Fares (java RichCoFlatMap)"); + } - public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { + public static class EnrichmentFunction + extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { - @Override - public void open(Configuration config) throws Exception { - throw new MissingSolutionException(); - } + @Override + public void open(Configuration config) throws Exception { + throw new MissingSolutionException(); + } - @Override - public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { - } + @Override + public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) + throws Exception {} - @Override - public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { - } - } + @Override + public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) + throws Exception {} + } } diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java index bcddc7b..9b8e096 100644 --- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java +++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java @@ -34,86 +34,94 @@ import org.apache.flink.training.exercises.common.utils.ExerciseBase; import org.apache.flink.util.Collector; /** - * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training in the docs. + * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training in the + * docs. * * <p>The goal for this exercise is to enrich TaxiRides with fare information. - * */ public class RidesAndFaresSolution extends ExerciseBase { - /** - * Main method. - * - * @throws Exception which occurs during job execution. - */ - public static void main(String[] args) throws Exception { - - // Set up streaming execution environment, including Web UI and REST endpoint. - // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for - // using the State Processor API. - - Configuration conf = new Configuration(); - conf.setString("state.backend", "filesystem"); - conf.setString("state.savepoints.dir", "file:///tmp/savepoints"); - conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); - env.setParallelism(ExerciseBase.parallelism); - - env.enableCheckpointing(10000L); - CheckpointConfig config = env.getCheckpointConfig(); - config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - - DataStream<TaxiRide> rides = env - .addSource(rideSourceOrTest(new TaxiRideGenerator())) - .filter((TaxiRide ride) -> ride.isStart) - .keyBy((TaxiRide ride) -> ride.rideId); - - DataStream<TaxiFare> fares = env - .addSource(fareSourceOrTest(new TaxiFareGenerator())) - .keyBy((TaxiFare fare) -> fare.rideId); - - // Set a UID on the stateful flatmap operator so we can read its state using the State Processor API. - DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides - .connect(fares) - .flatMap(new EnrichmentFunction()) - .uid("enrichment"); - - printOrTest(enrichedRides); - - env.execute("Join Rides with Fares (java RichCoFlatMap)"); - } - - public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { - // keyed, managed state - private ValueState<TaxiRide> rideState; - private ValueState<TaxiFare> fareState; - - @Override - public void open(Configuration config) { - rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); - fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); - } - - @Override - public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { - TaxiFare fare = fareState.value(); - if (fare != null) { - fareState.clear(); - out.collect(Tuple2.of(ride, fare)); - } else { - rideState.update(ride); - } - } - - @Override - public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception { - TaxiRide ride = rideState.value(); - if (ride != null) { - rideState.clear(); - out.collect(Tuple2.of(ride, fare)); - } else { - fareState.update(fare); - } - } - } + /** + * Main method. + * + * @throws Exception which occurs during job execution. + */ + public static void main(String[] args) throws Exception { + + // Set up streaming execution environment, including Web UI and REST endpoint. + // Checkpointing isn't needed for the RidesAndFares exercise; this setup is for + // using the State Processor API. + + Configuration conf = new Configuration(); + conf.setString("state.backend", "filesystem"); + conf.setString("state.savepoints.dir", "file:///tmp/savepoints"); + conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints"); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + env.setParallelism(ExerciseBase.parallelism); + + env.enableCheckpointing(10000L); + CheckpointConfig config = env.getCheckpointConfig(); + config.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + + DataStream<TaxiRide> rides = + env.addSource(rideSourceOrTest(new TaxiRideGenerator())) + .filter((TaxiRide ride) -> ride.isStart) + .keyBy((TaxiRide ride) -> ride.rideId); + + DataStream<TaxiFare> fares = + env.addSource(fareSourceOrTest(new TaxiFareGenerator())) + .keyBy((TaxiFare fare) -> fare.rideId); + + // Set a UID on the stateful flatmap operator so we can read its state using the State + // Processor API. + DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = + rides.connect(fares).flatMap(new EnrichmentFunction()).uid("enrichment"); + + printOrTest(enrichedRides); + + env.execute("Join Rides with Fares (java RichCoFlatMap)"); + } + + public static class EnrichmentFunction + extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> { + // keyed, managed state + private ValueState<TaxiRide> rideState; + private ValueState<TaxiFare> fareState; + + @Override + public void open(Configuration config) { + rideState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class)); + fareState = + getRuntimeContext() + .getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class)); + } + + @Override + public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) + throws Exception { + TaxiFare fare = fareState.value(); + if (fare != null) { + fareState.clear(); + out.collect(Tuple2.of(ride, fare)); + } else { + rideState.update(ride); + } + } + + @Override + public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) + throws Exception { + TaxiRide ride = rideState.value(); + if (ride != null) { + rideState.clear(); + out.collect(Tuple2.of(ride, fare)); + } else { + fareState.update(fare); + } + } + } } diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java index 76b4407..a670f95 100644 --- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java +++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java @@ -35,49 +35,52 @@ import static org.junit.Assert.assertThat; public class RidesAndFaresTest extends TaxiRideTestBase<Tuple2<TaxiRide, TaxiFare>> { - static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new String[]{}); - - final TaxiRide ride1 = testRide(1); - final TaxiRide ride2 = testRide(2); - final TaxiFare fare1 = testFare(1); - final TaxiFare fare2 = testFare(2); - - @Test - public void testInOrder() throws Exception { - TestRideSource rides = new TestRideSource(ride1, ride2); - TestFareSource fares = new TestFareSource(fare1, fare2); - - List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList( - Tuple2.of(ride1, fare1), - Tuple2.of(ride2, fare2)); - - assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray())); - } - - @Test - public void testOutOfOrder() throws Exception { - TestRideSource rides = new TestRideSource(ride1, ride2); - TestFareSource fares = new TestFareSource(fare2, fare1); - - List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList( - Tuple2.of(ride1, fare1), - Tuple2.of(ride2, fare2)); - - assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray())); - } - - private TaxiRide testRide(long rideId) { - return new TaxiRide(rideId, true, Instant.EPOCH, Instant.EPOCH, - 0F, 0F, 0F, 0F, (short) 1, 0, rideId); - } - - private TaxiFare testFare(long rideId) { - return new TaxiFare(rideId, 0, rideId, Instant.EPOCH, "", 0F, 0F, 0F); - } - - protected List<?> results(TestRideSource rides, TestFareSource fares) throws Exception { - Testable javaSolution = () -> RidesAndFaresSolution.main(new String[]{}); - return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, javaSolution); - } - + static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new String[] {}); + + final TaxiRide ride1 = testRide(1); + final TaxiRide ride2 = testRide(2); + final TaxiFare fare1 = testFare(1); + final TaxiFare fare2 = testFare(2); + + @Test + public void testInOrder() throws Exception { + TestRideSource rides = new TestRideSource(ride1, ride2); + TestFareSource fares = new TestFareSource(fare1, fare2); + + List<Tuple2<TaxiRide, TaxiFare>> expected = + Arrays.asList(Tuple2.of(ride1, fare1), Tuple2.of(ride2, fare2)); + + assertThat( + "Join results don't match", + results(rides, fares), + containsInAnyOrder(expected.toArray())); + } + + @Test + public void testOutOfOrder() throws Exception { + TestRideSource rides = new TestRideSource(ride1, ride2); + TestFareSource fares = new TestFareSource(fare2, fare1); + + List<Tuple2<TaxiRide, TaxiFare>> expected = + Arrays.asList(Tuple2.of(ride1, fare1), Tuple2.of(ride2, fare2)); + + assertThat( + "Join results don't match", + results(rides, fares), + containsInAnyOrder(expected.toArray())); + } + + private TaxiRide testRide(long rideId) { + return new TaxiRide( + rideId, true, Instant.EPOCH, Instant.EPOCH, 0F, 0F, 0F, 0F, (short) 1, 0, rideId); + } + + private TaxiFare testFare(long rideId) { + return new TaxiFare(rideId, 0, rideId, Instant.EPOCH, "", 0F, 0F, 0F); + } + + protected List<?> results(TestRideSource rides, TestFareSource fares) throws Exception { + Testable javaSolution = () -> RidesAndFaresSolution.main(new String[] {}); + return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, javaSolution); + } }
