NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r697212832
##########
File path:
rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
##########
@@ -18,68 +18,122 @@
package org.apache.flink.training.exercises.ridesandfares;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresExercise extends ExerciseBase {
+public class RidesAndFaresExercise {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresExercise(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
+ // A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ env.addSource(rideSource)
.filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
+ // A stream of taxi fare events, also keyed by rideId.
DataStream<TaxiFare> fares =
- env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy((TaxiFare fare) -> fare.rideId);
+ env.addSource(fareSource).keyBy((TaxiFare fare) ->
fare.rideId);
Review comment:
same here
```suggestion
env.addSource(fareSource).keyBy(fare -> fare.rideId);
```
##########
File path:
common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
##########
@@ -156,7 +156,9 @@ public int compareTo(@Nullable TaxiRide other) {
@Override
public boolean equals(Object other) {
- return other instanceof TaxiRide && this.rideId == ((TaxiRide)
other).rideId;
+ return other instanceof TaxiRide
+ && this.rideId == ((TaxiRide) other).rideId
+ && this.isStart == ((TaxiRide) other).isStart;
Review comment:
Should we just compare all properties (and auto-generate this method
from IntelliJ)?
##########
File path:
rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresIntegrationTest.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.ridesandfares;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedTwoInputPipeline;
+import org.apache.flink.training.exercises.testing.ExecutableTwoInputPipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class RidesAndFaresIntegrationTest extends RidesAndFaresTestBase {
+
+ private static final int PARALLELISM = 2;
+
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ private static final TaxiRide ride1 = testRide(1);
+ private static final TaxiRide ride2 = testRide(2);
+ private static final TaxiFare fare1 = testFare(1);
+ private static final TaxiFare fare2 = testFare(2);
+
+ @Test
+ public void testInOrder() throws Exception {
+
+ ParallelTestSource<TaxiRide> rides = new ParallelTestSource<>(ride1,
ride2);
+ ParallelTestSource<TaxiFare> fares = new ParallelTestSource<>(fare1,
fare2);
+ TestSink<RideAndFare> sink = new TestSink<>();
+
+ JobExecutionResult jobResult = ridesAndFaresPipeline().execute(rides,
fares, sink);
+ assertThat(sink.getResults(jobResult))
+ .containsExactlyInAnyOrder(
+ new RideAndFare(ride1, fare1), new RideAndFare(ride2,
fare2));
Review comment:
I don't think we can guarantee the order with p=2, e.g. if each ride
(id) is processed by a different subtask.
##########
File path:
rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
##########
@@ -18,80 +18,120 @@
package org.apache.flink.training.solutions.ridesandfares;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import org.apache.flink.util.Collector;
/**
- * Java reference implementation for the "Stateful Enrichment" exercise of the
Flink training in the
- * docs.
+ * Java reference implementation for the Stateful Enrichment exercise from the
Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresSolution extends ExerciseBase {
+public class RidesAndFaresSolution {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresSolution(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
-
- // Set up streaming execution environment, including Web UI and REST
endpoint.
- // Checkpointing isn't needed for the RidesAndFares exercise; this
setup is for
- // using the State Processor API.
-
- Configuration conf = new Configuration();
- conf.setString("state.backend", "filesystem");
- conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
- conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- env.setParallelism(ExerciseBase.parallelism);
-
- env.enableCheckpointing(10000L);
- CheckpointConfig config = env.getCheckpointConfig();
- config.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
+ // A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ env.addSource(rideSource)
.filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
+ // A stream of taxi fare events, also keyed by rideId.
DataStream<TaxiFare> fares =
- env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy((TaxiFare fare) -> fare.rideId);
+ env.addSource(fareSource).keyBy((TaxiFare fare) ->
fare.rideId);
Review comment:
```suggestion
env.addSource(fareSource).keyBy(fare -> fare.rideId);
```
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
Review comment:
unused (but may want to keep the Configuration import for the change
below)
```suggestion
import org.apache.flink.configuration.Configuration
```
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
Review comment:
The scala exercise is missing the part with the checkpoint configuration
for the State TTL exercise
##########
File path:
rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
##########
@@ -18,80 +18,120 @@
package org.apache.flink.training.solutions.ridesandfares;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import org.apache.flink.util.Collector;
/**
- * Java reference implementation for the "Stateful Enrichment" exercise of the
Flink training in the
- * docs.
+ * Java reference implementation for the Stateful Enrichment exercise from the
Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresSolution extends ExerciseBase {
+public class RidesAndFaresSolution {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresSolution(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
-
- // Set up streaming execution environment, including Web UI and REST
endpoint.
- // Checkpointing isn't needed for the RidesAndFares exercise; this
setup is for
- // using the State Processor API.
-
- Configuration conf = new Configuration();
- conf.setString("state.backend", "filesystem");
- conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
- conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- env.setParallelism(ExerciseBase.parallelism);
-
- env.enableCheckpointing(10000L);
- CheckpointConfig config = env.getCheckpointConfig();
- config.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
+ // A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ env.addSource(rideSource)
.filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
Review comment:
```suggestion
.filter(ride -> ride.isStart)
.keyBy(ride -> ride.rideId);
```
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
- val rides = env
- .addSource(rideSourceOrTest(new TaxiRideGenerator()))
- .filter { ride => ride.isStart }
- .keyBy { ride => ride.rideId }
+ val rides = env
+ .addSource(rideSource)
+ .filter { ride =>
+ ride.isStart
+ }
+ .keyBy { ride =>
+ ride.rideId
+ }
- val fares = env
- .addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy { fare => fare.rideId }
+ val fares = env
+ .addSource(fareSource)
+ .keyBy { fare =>
+ fare.rideId
+ }
- val processed = rides
- .connect(fares)
- .flatMap(new EnrichmentFunction)
+ rides
+ .connect(fares)
+ .flatMap(new EnrichmentFunction())
+ .addSink(sink)
- printOrTest(processed)
+ env.execute()
+ }
+ }
- env.execute("Join Rides with Fares (scala RichCoFlatMap)")
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job =
+ new RidesAndFaresJob(new TaxiRideGenerator, new TaxiFareGenerator, new
PrintSinkFunction)
+
+ job.execute()
}
- class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
(TaxiRide, TaxiFare)] {
+ class EnrichmentFunction() extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
RideAndFare] {
- override def flatMap1(ride: TaxiRide, out: Collector[(TaxiRide,
TaxiFare)]): Unit = {
- throw new MissingSolutionException()
+ override def flatMap1(ride: TaxiRide, out: Collector[RideAndFare]): Unit =
{
+ throw new MissingSolutionException();
Review comment:
```suggestion
throw new MissingSolutionException()
```
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
- val rides = env
- .addSource(rideSourceOrTest(new TaxiRideGenerator()))
- .filter { ride => ride.isStart }
- .keyBy { ride => ride.rideId }
+ val rides = env
+ .addSource(rideSource)
+ .filter { ride =>
+ ride.isStart
+ }
+ .keyBy { ride =>
+ ride.rideId
+ }
- val fares = env
- .addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy { fare => fare.rideId }
+ val fares = env
+ .addSource(fareSource)
+ .keyBy { fare =>
+ fare.rideId
+ }
- val processed = rides
- .connect(fares)
- .flatMap(new EnrichmentFunction)
+ rides
+ .connect(fares)
+ .flatMap(new EnrichmentFunction())
+ .addSink(sink)
- printOrTest(processed)
+ env.execute()
+ }
+ }
- env.execute("Join Rides with Fares (scala RichCoFlatMap)")
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job =
+ new RidesAndFaresJob(new TaxiRideGenerator, new TaxiFareGenerator, new
PrintSinkFunction)
+
+ job.execute()
}
- class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
(TaxiRide, TaxiFare)] {
+ class EnrichmentFunction() extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
RideAndFare] {
- override def flatMap1(ride: TaxiRide, out: Collector[(TaxiRide,
TaxiFare)]): Unit = {
- throw new MissingSolutionException()
+ override def flatMap1(ride: TaxiRide, out: Collector[RideAndFare]): Unit =
{
+ throw new MissingSolutionException();
}
- override def flatMap2(fare: TaxiFare, out: Collector[(TaxiRide,
TaxiFare)]): Unit = {
+ override def flatMap2(fare: TaxiFare, out: Collector[RideAndFare]): Unit =
{
+ throw new MissingSolutionException();
Review comment:
```suggestion
throw new MissingSolutionException()
```
##########
File path:
rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
##########
@@ -18,68 +18,122 @@
package org.apache.flink.training.exercises.ridesandfares;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresExercise extends ExerciseBase {
+public class RidesAndFaresExercise {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresExercise(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
+ // A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ env.addSource(rideSource)
.filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
Review comment:
I think, we changed these lambdas to their simpler form in the LongRides
exercises, but for educational purposes, they could also stay as they are...
```suggestion
.filter(ride -> ride.isStart)
.keyBy(ride -> ride.rideId);
```
##########
File path:
rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
##########
@@ -18,68 +18,122 @@
package org.apache.flink.training.exercises.ridesandfares;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
import org.apache.flink.util.Collector;
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*/
-public class RidesAndFaresExercise extends ExerciseBase {
+public class RidesAndFaresExercise {
+
+ private final SourceFunction<TaxiRide> rideSource;
+ private final SourceFunction<TaxiFare> fareSource;
+ private final SinkFunction<RideAndFare> sink;
+
+ /** Creates a job using the sources and sink provided. */
+ public RidesAndFaresExercise(
+ SourceFunction<TaxiRide> rideSource,
+ SourceFunction<TaxiFare> fareSource,
+ SinkFunction<RideAndFare> sink) {
+
+ this.rideSource = rideSource;
+ this.fareSource = fareSource;
+ this.sink = sink;
+ }
/**
- * Main method.
+ * Creates and executes the pipeline using the StreamExecutionEnvironment
provided.
*
* @throws Exception which occurs during job execution.
+ * @param env The {StreamExecutionEnvironment}.
+ * @return {JobExecutionResult}
*/
- public static void main(String[] args) throws Exception {
-
- // set up streaming execution environment
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(ExerciseBase.parallelism);
+ public JobExecutionResult execute(StreamExecutionEnvironment env) throws
Exception {
+ // A stream of taxi ride START events, keyed by rideId.
DataStream<TaxiRide> rides =
- env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+ env.addSource(rideSource)
.filter((TaxiRide ride) -> ride.isStart)
.keyBy((TaxiRide ride) -> ride.rideId);
Review comment:
(this code change also requires a new spotlessApply)
##########
File path:
rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresIntegrationTest.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.ridesandfares;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.RideAndFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedTwoInputPipeline;
+import org.apache.flink.training.exercises.testing.ExecutableTwoInputPipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class RidesAndFaresIntegrationTest extends RidesAndFaresTestBase {
+
+ private static final int PARALLELISM = 2;
+
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ private static final TaxiRide ride1 = testRide(1);
+ private static final TaxiRide ride2 = testRide(2);
+ private static final TaxiFare fare1 = testFare(1);
+ private static final TaxiFare fare2 = testFare(2);
+
+ @Test
+ public void testInOrder() throws Exception {
+
+ ParallelTestSource<TaxiRide> rides = new ParallelTestSource<>(ride1,
ride2);
+ ParallelTestSource<TaxiFare> fares = new ParallelTestSource<>(fare1,
fare2);
+ TestSink<RideAndFare> sink = new TestSink<>();
+
+ JobExecutionResult jobResult = ridesAndFaresPipeline().execute(rides,
fares, sink);
+ assertThat(sink.getResults(jobResult))
+ .containsExactlyInAnyOrder(
+ new RideAndFare(ride1, fare1), new RideAndFare(ride2,
fare2));
+ }
+
+ @Test
+ public void testFaresOutOfOrder() throws Exception {
Review comment:
Due to the parallel source, it's actually unsure whether these are also
received out of order by the same subtask. Looks like this is more a unit test
with a test harness where we control the input order. But a wild mix of more(!)
out-of-order fares (and rides?) may still make a good integration test
##########
File path:
rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
##########
@@ -18,55 +18,73 @@
package org.apache.flink.training.exercises.ridesandfares.scala
+import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction,
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
-import org.apache.flink.training.exercises.common.datatypes.{TaxiFare,
TaxiRide}
+import org.apache.flink.training.exercises.common.datatypes.{RideAndFare,
TaxiFare, TaxiRide}
import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator,
TaxiRideGenerator}
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase,
MissingSolutionException}
+import
org.apache.flink.training.exercises.common.utils.MissingSolutionException
import org.apache.flink.util.Collector
/**
- * The "Stateful Enrichment" exercise of the Flink training in the docs.
+ * The Stateful Enrichment exercise from the Flink training.
*
* The goal for this exercise is to enrich TaxiRides with fare information.
- *
*/
object RidesAndFaresExercise {
- def main(args: Array[String]) {
+ class RidesAndFaresJob(rideSource: SourceFunction[TaxiRide],
+ fareSource: SourceFunction[TaxiFare],
+ sink: SinkFunction[RideAndFare]) {
- // set up streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(ExerciseBase.parallelism)
+ def execute(): JobExecutionResult = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
- val rides = env
- .addSource(rideSourceOrTest(new TaxiRideGenerator()))
- .filter { ride => ride.isStart }
- .keyBy { ride => ride.rideId }
+ val rides = env
+ .addSource(rideSource)
+ .filter { ride =>
+ ride.isStart
+ }
+ .keyBy { ride =>
+ ride.rideId
+ }
- val fares = env
- .addSource(fareSourceOrTest(new TaxiFareGenerator()))
- .keyBy { fare => fare.rideId }
+ val fares = env
+ .addSource(fareSource)
+ .keyBy { fare =>
+ fare.rideId
+ }
- val processed = rides
- .connect(fares)
- .flatMap(new EnrichmentFunction)
+ rides
+ .connect(fares)
+ .flatMap(new EnrichmentFunction())
+ .addSink(sink)
- printOrTest(processed)
+ env.execute()
+ }
+ }
- env.execute("Join Rides with Fares (scala RichCoFlatMap)")
+ @throws[Exception]
+ def main(args: Array[String]): Unit = {
+ val job =
+ new RidesAndFaresJob(new TaxiRideGenerator, new TaxiFareGenerator, new
PrintSinkFunction)
+
+ job.execute()
}
- class EnrichmentFunction extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
(TaxiRide, TaxiFare)] {
+ class EnrichmentFunction() extends RichCoFlatMapFunction[TaxiRide, TaxiFare,
RideAndFare] {
Review comment:
let's also put the `open()` template here, just like in Java:
```suggestion
class EnrichmentFunction() extends RichCoFlatMapFunction[TaxiRide,
TaxiFare, RideAndFare] {
override def open(parameters: Configuration): Unit = {
throw new MissingSolutionException()
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]