NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698320553
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
##########
@@ -0,0 +1,70 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A RichCoFlatMapFunction that can delegate to a RichCoFlatMapFunction in
either the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class ComposedRichCoFlatMapFunction<IN1, IN2, OUT>
+ extends RichCoFlatMapFunction<IN1, IN2, OUT> {
+ private final RichCoFlatMapFunction<IN1, IN2, OUT> exercise;
+ private final RichCoFlatMapFunction<IN1, IN2, OUT> solution;
+ private boolean useExercise;
+
+ public ComposedRichCoFlatMapFunction(
+ RichCoFlatMapFunction<IN1, IN2, OUT> exercise,
+ RichCoFlatMapFunction<IN1, IN2, OUT> solution) {
+
+ this.exercise = exercise;
+ this.solution = solution;
+ this.useExercise = true;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ try {
+ exercise.setRuntimeContext(this.getRuntimeContext());
+ exercise.open(parameters);
+ } catch (Exception e) {
+ if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+ this.useExercise = false;
+ solution.setRuntimeContext(this.getRuntimeContext());
+ solution.open(parameters);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+
+ if (useExercise) {
+ exercise.flatMap1(value, out);
+ } else {
+ solution.flatMap1(value, out);
+ }
Review comment:
Looking at this pattern again, what do you think about this alternative:
- create a new member `private RichCoFlatMapFunction<IN1, IN2, OUT>
functionToExecute;` (or maybe you have a better idea for the name)
- assign either `exercise` or `solution` to it
- just call `functionToExecute.flatMap1()` etc
-> Less code, don't have to wait for JIT to optimise the boolean away
(hopefully it can), no additional memory required (just a new reference that
replaces the boolean)
##########
File path:
hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
##########
@@ -40,18 +56,32 @@
*/
public static void main(String[] args) throws Exception {
+ HourlyTipsSolution job =
+ new HourlyTipsSolution(new TaxiFareGenerator(), new
PrintSinkFunction<>());
+
+ job.execute();
+ }
+
+ /**
+ * Create and execute the hourly tips pipeline.
+ *
+ * @return {JobExecutionResult}
+ * @throws Exception which occurs during job execution.
+ */
+ public JobExecutionResult execute() throws Exception {
+
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
// start the data generator
- DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new
TaxiFareGenerator()));
-
- throw new MissingSolutionException();
+ DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());
Review comment:
```suggestion
DataStream<TaxiFare> fares = env.addSource(source);
```
##########
File path:
hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
##########
@@ -72,27 +84,39 @@ public void testMaxAcrossDrivers() throws Exception {
TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);
Review comment:
Can you add a couple of more test fares so that chances are higher that
these are processed on different parallel instances (if the users' code
supports parallel instances)?
##########
File path:
hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
##########
@@ -72,27 +84,39 @@ public void testMaxAcrossDrivers() throws Exception {
TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);
TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F);
- TestFareSource source =
- new TestFareSource(oneFor1In1, fiveFor1In1, tenFor1In2,
twentyFor2In2);
+ ParallelTestSource<TaxiFare> source =
+ new ParallelTestSource<>(oneFor1In1, fiveFor1In1, tenFor1In2,
twentyFor2In2);
Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L,
6.0F);
Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L,
20.0F);
- List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
-
- assertEquals(expected, results(source));
+ assertThat(results(source)).containsExactlyInAnyOrder(hour1, hour2);
}
private Instant t(int minutes) {
- return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 *
minutes);
+ return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60L *
minutes);
Review comment:
Do you think, it's worth putting
`Instant.parse("2020-01-01T12:00:00.00Z")` into a static constant not to run it
again and again?
##########
File path:
hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
##########
@@ -18,36 +18,47 @@
package org.apache.flink.training.exercises.hourlytips.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
/**
- * The "Hourly Tips" exercise of the Flink training in the docs.
+ * The Hourly Tips exercise from the Flink training.
*
- * The task of the exercise is to first calculate the total tips collected by
each driver, hour by hour, and
- * then from that stream, find the highest tip total in each hour.
+ * The task of the exercise is to first calculate the total tips collected by
each driver,
+ * hour by hour, and then from that stream, find the highest tip total in
each hour.
*
*/
object HourlyTipsExercise {
- def main(args: Array[String]) {
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job = new HourlyTipsJob(new TaxiFareGenerator, new PrintSinkFunction)
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ job.execute()
+ }
- // start the data generator
- val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+ class HourlyTipsJob(source: SourceFunction[TaxiFare], sink:
SinkFunction[(Long, Long, Float)]) {
Review comment:
Do you want to add a skeleton for adding source (and sink) similar to
the Java exercise?
##########
File path:
hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
##########
@@ -40,18 +56,32 @@
*/
public static void main(String[] args) throws Exception {
+ HourlyTipsSolution job =
+ new HourlyTipsSolution(new TaxiFareGenerator(), new
PrintSinkFunction<>());
+
+ job.execute();
+ }
+
+ /**
+ * Create and execute the hourly tips pipeline.
+ *
+ * @return {JobExecutionResult}
+ * @throws Exception which occurs during job execution.
+ */
+ public JobExecutionResult execute() throws Exception {
+
// set up streaming execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
// start the data generator
- DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new
TaxiFareGenerator()));
-
- throw new MissingSolutionException();
+ DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());
Review comment:
Similarly, we should hint users to use `sink` in their solutions or the
tests will fail
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
Review comment:
This is actually about the Scala **exercise** (since it's also in the
java exercise). Do you want users to run the solution to create a checkpoint
they can look at, or should they just run a solution and examine that?
If it's the latter, then, true, we don't need the Scala code here, but in
this case, we also don't need it in the Java exercise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]