alpinegizmo commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698865364



##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedRichCoFlatMapFunction.java
##########
@@ -0,0 +1,70 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A RichCoFlatMapFunction that can delegate to a RichCoFlatMapFunction in 
either the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class ComposedRichCoFlatMapFunction<IN1, IN2, OUT>
+        extends RichCoFlatMapFunction<IN1, IN2, OUT> {
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> exercise;
+    private final RichCoFlatMapFunction<IN1, IN2, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedRichCoFlatMapFunction(
+            RichCoFlatMapFunction<IN1, IN2, OUT> exercise,
+            RichCoFlatMapFunction<IN1, IN2, OUT> solution) {
+
+        this.exercise = exercise;
+        this.solution = solution;
+        this.useExercise = true;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        try {
+            exercise.setRuntimeContext(this.getRuntimeContext());
+            exercise.open(parameters);
+        } catch (Exception e) {
+            if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+                this.useExercise = false;
+                solution.setRuntimeContext(this.getRuntimeContext());
+                solution.open(parameters);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
+
+        if (useExercise) {
+            exercise.flatMap1(value, out);
+        } else {
+            solution.flatMap1(value, out);
+        }

Review comment:
       I like this! Good idea.
   
   I don't know that I want to bother with this for the ComposedFilterFunction, 
however. Doing so increases the complexity in that case, and improving the 
performance for this unusual situation doesn't seem very important.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to