alpinegizmo commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698855269



##########
File path: 
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
 
 package org.apache.flink.training.exercises.ridesandfares.scala
 
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare, 
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare, 
TaxiFare, TaxiRide}
 import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator, 
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 import org.apache.flink.util.Collector
 
 /**
-  * The "Stateful Enrichment" exercise of the Flink training in the docs.
+  * The Stateful Enrichment exercise from the Flink training.
   *
   * The goal for this exercise is to enrich TaxiRides with fare information.
-  *
   */
 object RidesAndFaresExercise {
 
-  def main(args: Array[String]) {
+  class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+                         fareSource: SourceFunction[TaxiFare],
+                         sink: SinkFunction[RideAndFare]) {
 
-    // set up streaming execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+    def execute(): JobExecutionResult = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment

Review comment:
       You're right, I don't really want that code in the java exercise. I've 
removed it.

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
##########
@@ -0,0 +1,70 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A RichCoFlatMapFunction that can delegate to a RichCoFlatMapFunction in 
either the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class ComposedRichCoFlatMapFunction<IN1, IN2, OUT>
+        extends RichCoFlatMapFunction<IN1, IN2, OUT> {
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> exercise;
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedRichCoFlatMapFunction(
+            RichCoFlatMapFunction<IN1, IN2, OUT> exercise,
+            RichCoFlatMapFunction<IN1, IN2, OUT> solution) {
+
+        this.exercise = exercise;
+        this.solution = solution;
+        this.useExercise = true;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        try {
+            exercise.setRuntimeContext(this.getRuntimeContext());
+            exercise.open(parameters);
+        } catch (Exception e) {
+            if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+                this.useExercise = false;
+                solution.setRuntimeContext(this.getRuntimeContext());
+                solution.open(parameters);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+
+        if (useExercise) {
+            exercise.flatMap1(value, out);
+        } else {
+            solution.flatMap1(value, out);
+        }

Review comment:
       I like this! Good idea.
   
   I don't know that I want to bother with this for the ComposedFilterFunction, 
however. Doing so increases the complexity in that case, and improving the 
performance for this unusual situation doesn't seem very important.

##########
File path: 
hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
##########
@@ -18,36 +18,47 @@
 
 package org.apache.flink.training.exercises.hourlytips.scala
 
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala._
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare
 import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 
 /**
-  * The "Hourly Tips" exercise of the Flink training in the docs.
+  * The Hourly Tips exercise from the Flink training.
   *
-  * The task of the exercise is to first calculate the total tips collected by 
each driver, hour by hour, and
-  * then from that stream, find the highest tip total in each hour.
+  * The task of the exercise is to first calculate the total tips collected by 
each driver,
+  * hour by hour, and then from that stream, find the highest tip total in 
each hour.
   *
   */
 object HourlyTipsExercise {
 
-  def main(args: Array[String]) {
+  @throws[Exception]
+  def main(args: Array[String]): Unit = {
+    val job = new HourlyTipsJob(new TaxiFareGenerator, new PrintSinkFunction)
 
-    // set up streaming execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+    job.execute()
+  }
 
-    // start the data generator
-    val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+  class HourlyTipsJob(source: SourceFunction[TaxiFare], sink: 
SinkFunction[(Long, Long, Float)]) {

Review comment:
       yes, done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to