alpinegizmo commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698855269
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
Review comment:
You're right, I don't really want that code in the java exercise. I've
removed it.
##########
File path:
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
##########
@@ -0,0 +1,70 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A RichCoFlatMapFunction that can delegate to a RichCoFlatMapFunction in
either the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class ComposedRichCoFlatMapFunction<IN1, IN2, OUT>
+ extends RichCoFlatMapFunction<IN1, IN2, OUT> {
+ private final RichCoFlatMapFunction<IN1, IN2, OUT> exercise;
+ private final RichCoFlatMapFunction<IN1, IN2, OUT> solution;
+ private boolean useExercise;
+
+ public ComposedRichCoFlatMapFunction(
+ RichCoFlatMapFunction<IN1, IN2, OUT> exercise,
+ RichCoFlatMapFunction<IN1, IN2, OUT> solution) {
+
+ this.exercise = exercise;
+ this.solution = solution;
+ this.useExercise = true;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+
+ try {
+ exercise.setRuntimeContext(this.getRuntimeContext());
+ exercise.open(parameters);
+ } catch (Exception e) {
+ if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+ this.useExercise = false;
+ solution.setRuntimeContext(this.getRuntimeContext());
+ solution.open(parameters);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+
+ if (useExercise) {
+ exercise.flatMap1(value, out);
+ } else {
+ solution.flatMap1(value, out);
+ }
Review comment:
I like this! Good idea.
I don't know that I want to bother with this for the ComposedFilterFunction,
however. Doing so increases the complexity in that case, and improving the
performance for this unusual situation doesn't seem very important.
##########
File path:
hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
##########
@@ -18,36 +18,47 @@
package org.apache.flink.training.exercises.hourlytips.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
/**
- * The "Hourly Tips" exercise of the Flink training in the docs.
+ * The Hourly Tips exercise from the Flink training.
*
- * The task of the exercise is to first calculate the total tips collected by
each driver, hour by hour, and
- * then from that stream, find the highest tip total in each hour.
+ * The task of the exercise is to first calculate the total tips collected by
each driver,
+ * hour by hour, and then from that stream, find the highest tip total in
each hour.
*
*/
object HourlyTipsExercise {
- def main(args: Array[String]) {
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job = new HourlyTipsJob(new TaxiFareGenerator, new PrintSinkFunction)
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ job.execute()
+ }
- // start the data generator
- val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
+ class HourlyTipsJob(source: SourceFunction[TaxiFare], sink:
SinkFunction[(Long, Long, Float)]) {
Review comment:
yes, done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]