NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r698359197
##########
File path:
rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
##########
@@ -18,68 +18,119 @@
package org.apache.flink.training.exercises.ridesandfares;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresExercise extends ExerciseBase {
+public class RidesAndFaresExercise {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresExercise(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
- // set up streaming execution environment
+ // A stream of taxi ride START events, keyed by rideId.
+ DataStream<TaxiRide> rides =
+ env.addSource(rideSource).filter(ride ->
ride.isStart).keyBy(ride -> ride.rideId);
+
+ // A stream of taxi fare events, also keyed by rideId.
+ DataStream<TaxiFare> fares = env.addSource(fareSource).keyBy(fare ->
fare.rideId);
+
+ // Create the pipeline.
+ rides.connect(fares)
+ .flatMap(new EnrichmentFunction())
+ .uid("enrichment") // uid for this operator's state
+ .name("enrichment") // name for this operator in the web UI
+ .addSink(sink);
+
+ // Execute the pipeline and return the result.
+ return env.execute("Join Rides with Fares");
+ }
+
+ /** Creates and executes the pipeline using the default
StreamExecutionEnvironment. */
+ public JobExecutionResult execute() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
- DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
- .filter((TaxiRide ride) -> ride.isStart)
- .keyBy((TaxiRide ride) -> ride.rideId);
+ return execute(env);
+ }
+
+ /**
+ * Main method.
+ *
+ * @throws Exception which occurs during job execution.
+ */
+ public static void main(String[] args) throws Exception {
- DataStream<TaxiFare> fares =
- env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy((TaxiFare fare) -> fare.rideId);
+ RidesAndFaresExercise job =
+ new RidesAndFaresExercise(
+ new TaxiRideGenerator(),
+ new TaxiFareGenerator(),
+ new PrintSinkFunction<>());
- DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides =
- rides.connect(fares).flatMap(new EnrichmentFunction());
+ // Setting up checkpointing so that the state can be explored with the
State Processor API.
+ // Generally it's better to separate configuration settings from the
code,
+ // but for this example it's convenient to have it here for running in
the IDE.
+ Configuration conf = new Configuration();
+ conf.setString("state.backend", "filesystem");
+ conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
+ conf.setString("execution.checkpointing.interval", "10s");
+ conf.setString(
+ "execution.checkpointing.externalized-checkpoint-retention",
+ "RETAIN_ON_CANCELLATION");
- printOrTest(enrichedRides);
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.execute("Join Rides with Fares (java RichCoFlatMap)");
+ job.execute(env);
}
public static class EnrichmentFunction
- extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide,
TaxiFare>> {
+ extends RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare> {
@Override
- public void open(Configuration config) throws Exception {
+ public void open(Configuration config) throws MissingSolutionException
{
Review comment:
Why not use `Exception` here? For the exercise skeleton, we should keep
the original signatures (even though they may generate IntelliJ warnings like
in `flatMap1`,... because we are not using them.
I'd like to not scare away beginners from throwing exceptions (should
anything be wrong) or using library code that throws exceptions.
##########
File path:
rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
##########
@@ -0,0 +1,93 @@
+package org.apache.flink.training.exercises.ridesandfares;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import
org.apache.flink.training.exercises.testing.ComposedRichCoFlatMapFunction;
+import
org.apache.flink.training.solutions.ridesandfares.scala.RidesAndFaresSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class RidesAndFaresUnitTest extends RidesAndFaresTestBase {
+
+ private KeyedTwoInputStreamOperatorTestHarness<Long, TaxiRide, TaxiFare,
RideAndFare> harness;
+
+ private final RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
javaExercise =
+ new RidesAndFaresExercise.EnrichmentFunction();
+
+ private final RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
javaSolution =
+ new RidesAndFaresSolution.EnrichmentFunction();
+
+ protected ComposedRichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
+ composedEnrichmentFunction() {
+ return new ComposedRichCoFlatMapFunction<>(javaExercise, javaSolution);
+ }
+
+ private static final TaxiRide ride1 = testRide(1);
+ private static final TaxiFare fare1 = testFare(1);
+
+ @Before
+ public void setupTestHarness() throws Exception {
+ this.harness = setupHarness(composedEnrichmentFunction());
+ }
+
+ @Test
+ public void testRideStateCreatedAndCleared() throws Exception {
+
+ // Stream in a ride and check that state was created
+ harness.processElement1(ride1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
+
+ // After processing the matching fare, the state should be cleared
+ harness.processElement2(fare1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isZero();
+
+ // Verify the result
+ StreamRecord<RideAndFare> expected =
+ new StreamRecord(new RideAndFare(ride1, fare1),
ride1.getEventTime());
+ assertThat(harness.getOutput()).containsExactly(expected);
+ }
+
+ @Test
+ public void testFareStateCreatedAndCleared() throws Exception {
+
+ // Stream in a fare and check that state was created
+ harness.processElement2(fare1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
+
+ // After processing the matching ride, the state should be cleared
+ harness.processElement1(ride1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isZero();
+
+ // Verify the result
+ StreamRecord<RideAndFare> expected =
+ new StreamRecord(new RideAndFare(ride1, fare1),
ride1.getEventTime());
Review comment:
```suggestion
new StreamRecord<>(new RideAndFare(ride1, fare1),
ride1.getEventTime());
```
##########
File path:
rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresUnitTest.java
##########
@@ -0,0 +1,93 @@
+package org.apache.flink.training.exercises.ridesandfares;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import
org.apache.flink.training.exercises.testing.ComposedRichCoFlatMapFunction;
+import
org.apache.flink.training.solutions.ridesandfares.scala.RidesAndFaresSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class RidesAndFaresUnitTest extends RidesAndFaresTestBase {
+
+ private KeyedTwoInputStreamOperatorTestHarness<Long, TaxiRide, TaxiFare,
RideAndFare> harness;
+
+ private final RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
javaExercise =
+ new RidesAndFaresExercise.EnrichmentFunction();
+
+ private final RichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
javaSolution =
+ new RidesAndFaresSolution.EnrichmentFunction();
+
+ protected ComposedRichCoFlatMapFunction<TaxiRide, TaxiFare, RideAndFare>
+ composedEnrichmentFunction() {
+ return new ComposedRichCoFlatMapFunction<>(javaExercise, javaSolution);
+ }
+
+ private static final TaxiRide ride1 = testRide(1);
+ private static final TaxiFare fare1 = testFare(1);
+
+ @Before
+ public void setupTestHarness() throws Exception {
+ this.harness = setupHarness(composedEnrichmentFunction());
+ }
+
+ @Test
+ public void testRideStateCreatedAndCleared() throws Exception {
+
+ // Stream in a ride and check that state was created
+ harness.processElement1(ride1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
+
+ // After processing the matching fare, the state should be cleared
+ harness.processElement2(fare1.asStreamRecord());
+ assertThat(harness.numKeyedStateEntries()).isZero();
+
+ // Verify the result
+ StreamRecord<RideAndFare> expected =
+ new StreamRecord(new RideAndFare(ride1, fare1),
ride1.getEventTime());
Review comment:
```suggestion
new StreamRecord<>(new RideAndFare(ride1, fare1),
ride1.getEventTime());
```
##########
File path:
ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingUnitTest.java
##########
@@ -0,0 +1,45 @@
+package org.apache.flink.training.exercises.ridecleansing;
+
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedFilterFunction;
+import org.apache.flink.training.solutions.ridecleansing.RideCleansingSolution;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RideCleansingUnitTest extends RideCleansingTestBase {
+
+ public ComposedFilterFunction<TaxiRide> filterFunction() {
+ return new ComposedFilterFunction<>(
+ new RideCleansingExercise.NYCFilter(), new
RideCleansingSolution.NYCFilter());
+ }
+
+ @Test
+ public void testRideThatStartsAndEndsInNYC() throws Exception {
+
+ TaxiRide atPennStation = testRide(-73.9947F, 40.750626F, -73.9947F,
40.750626F);
+ assertThat(filterFunction().filter(atPennStation)).isEqualTo(true);
Review comment:
```suggestion
assertThat(filterFunction().filter(atPennStation)).isTrue();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]