NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698320553



##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
##########
@@ -0,0 +1,70 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A RichCoFlatMapFunction that can delegate to a RichCoFlatMapFunction in 
either the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class ComposedRichCoFlatMapFunction<IN1, IN2, OUT>
+        extends RichCoFlatMapFunction<IN1, IN2, OUT> {
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> exercise;
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedRichCoFlatMapFunction(
+            RichCoFlatMapFunction<IN1, IN2, OUT> exercise,
+            RichCoFlatMapFunction<IN1, IN2, OUT> solution) {
+
+        this.exercise = exercise;
+        this.solution = solution;
+        this.useExercise = true;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        try {
+            exercise.setRuntimeContext(this.getRuntimeContext());
+            exercise.open(parameters);
+        } catch (Exception e) {
+            if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+                this.useExercise = false;
+                solution.setRuntimeContext(this.getRuntimeContext());
+                solution.open(parameters);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+
+        if (useExercise) {
+            exercise.flatMap1(value, out);
+        } else {
+            solution.flatMap1(value, out);
+        }

Review comment:
       Looking at this pattern again, what do you think about this alternative:
   - create a new member `private RichCoFlatMapFunction<IN1, IN2, OUT> 
functionToExecute;` (or maybe you have a better idea for the name)
   - assign either `exercise` or `solution` to it
   - just call `functionToExecute.flatMap1()` etc
   
   -> Less code, don't have to wait for JIT to optimise the boolean away 
(hopefully it can), no additional memory required (just a new reference that 
replaces the boolean)

##########
File path: 
hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
##########
@@ -40,18 +56,32 @@
      */
     public static void main(String[] args) throws Exception {
 
+        HourlyTipsSolution job =
+                new HourlyTipsSolution(new TaxiFareGenerator(), new 
PrintSinkFunction<>());
+
+        job.execute();
+    }
+
+    /**
+     * Create and execute the hourly tips pipeline.
+     *
+     * @return {JobExecutionResult}
+     * @throws Exception which occurs during job execution.
+     */
+    public JobExecutionResult execute() throws Exception {
+
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);
 
         // start the data generator
-        DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
-
-        throw new MissingSolutionException();
+        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());

Review comment:
       ```suggestion
           DataStream<TaxiFare> fares = env.addSource(source);
   ```

##########
File path: 
hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
##########
@@ -72,27 +84,39 @@ public void testMaxAcrossDrivers() throws Exception {
         TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);

Review comment:
       Can you add a couple of more test fares so that chances are higher that 
these are processed on different parallel instances (if the users' code 
supports parallel instances)?

##########
File path: 
hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
##########
@@ -72,27 +84,39 @@ public void testMaxAcrossDrivers() throws Exception {
         TaxiFare tenFor1In2 = testFare(1, t(90), 10.0F);
         TaxiFare twentyFor2In2 = testFare(2, t(90), 20.0F);
 
-        TestFareSource source =
-                new TestFareSource(oneFor1In1, fiveFor1In1, tenFor1In2, 
twentyFor2In2);
+        ParallelTestSource<TaxiFare> source =
+                new ParallelTestSource<>(oneFor1In1, fiveFor1In1, tenFor1In2, 
twentyFor2In2);
 
         Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 
6.0F);
         Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L, 
20.0F);
 
-        List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
-
-        assertEquals(expected, results(source));
+        assertThat(results(source)).containsExactlyInAnyOrder(hour1, hour2);
     }
 
     private Instant t(int minutes) {
-        return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 * 
minutes);
+        return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60L * 
minutes);

Review comment:
       Do you think, it's worth putting 
`Instant.parse("2020-01-01T12:00:00.00Z")` into a static constant not to run it 
again and again?

##########
File path: 
hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
##########
@@ -18,36 +18,47 @@
 
 package org.apache.flink.training.exercises.hourlytips.scala
 
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala._
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare
 import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 
 /**
-  * The "Hourly Tips" exercise of the Flink training in the docs.
+  * The Hourly Tips exercise from the Flink training.
   *
-  * The task of the exercise is to first calculate the total tips collected by 
each driver, hour by hour, and
-  * then from that stream, find the highest tip total in each hour.
+  * The task of the exercise is to first calculate the total tips collected by 
each driver,
+  * hour by hour, and then from that stream, find the highest tip total in 
each hour.
   *
   */
 object HourlyTipsExercise {
 
-  def main(args: Array[String]) {
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val job = new HourlyTipsJob(new TaxiFareGenerator, new PrintSinkFunction)
 
-    // set up streaming execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+    job.execute()
+  }
 
-    // start the data generator
-    val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+  class HourlyTipsJob(source: SourceFunction[TaxiFare], sink: 
SinkFunction[(Long, Long, Float)]) {

Review comment:
       Do you want to add a skeleton for adding source (and sink) similar to 
the Java exercise?

##########
File path: 
hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
##########
@@ -40,18 +56,32 @@
      */
     public static void main(String[] args) throws Exception {
 
+        HourlyTipsSolution job =
+                new HourlyTipsSolution(new TaxiFareGenerator(), new 
PrintSinkFunction<>());
+
+        job.execute();
+    }
+
+    /**
+     * Create and execute the hourly tips pipeline.
+     *
+     * @return {JobExecutionResult}
+     * @throws Exception which occurs during job execution.
+     */
+    public JobExecutionResult execute() throws Exception {
+
         // set up streaming execution environment
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(ExerciseBase.parallelism);
 
         // start the data generator
-        DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new 
TaxiFareGenerator()));
-
-        throw new MissingSolutionException();
+        DataStream<TaxiFare> fares = env.addSource(new TaxiFareGenerator());

Review comment:
       Similarly, we should hint users to use `sink` in their solutions or the 
tests will fail

##########
File path: 
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
 
 package org.apache.flink.training.exercises.ridesandfares.scala
 
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare, 
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare, 
TaxiFare, TaxiRide}
 import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator, 
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 import org.apache.flink.util.Collector
 
 /**
-  * The "Stateful Enrichment" exercise of the Flink training in the docs.
+  * The Stateful Enrichment exercise from the Flink training.
   *
   * The goal for this exercise is to enrich TaxiRides with fare information.
-  *
   */
 object RidesAndFaresExercise {
 
-  def main(args: Array[String]) {
+  class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+                         fareSource: SourceFunction[TaxiFare],
+                         sink: SinkFunction[RideAndFare]) {
 
-    // set up streaming execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+    def execute(): JobExecutionResult = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment

Review comment:
       This is actually about the Scala **exercise** (since it's also in the 
java exercise). Do you want users to run the solution to create a checkpoint 
they can look at, or should they just run a solution and examine that?
   
   If it's the latter, then, true, we don't need the Scala code here, but in 
this case, we also don't need it in the Java exercise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to