alpinegizmo commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r694423579
##########
File path:
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
##########
@@ -18,89 +18,150 @@
package org.apache.flink.training.exercises.longrides;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.exercises.testing.TestSourcePartitioner;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+import org.junit.ClassRule;
import org.junit.Test;
import java.time.Instant;
-import java.util.Collections;
-import java.util.List;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
+public class LongRidesTest {
- static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new
String[] {});
+ private static final int PARALLELISM = 2;
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ /** This isn't necessary, but speeds up the tests. */
+ @ClassRule
+ public static MiniClusterWithClientResource flinkCluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setNumberTaskManagers(1)
+ .build());
+
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+ public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180
* 60);
@Test
public void shortRide() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
+
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(rideStarted,
endedOneMinLater, markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted,
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void outOfOrder() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater, rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(endedOneMinLater,
rideStarted, markOneMinLater);
- assert (results(source).isEmpty());
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
public void noStartShort() throws Exception {
- Instant oneMinLater = BEGINNING.plusSeconds(60);
TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
- Long markOneMinLater = oneMinLater.toEpochMilli();
+ TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
- TestRideSource source = new TestRideSource(endedOneMinLater,
markOneMinLater);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedOneMinLater);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).isEmpty();
}
@Test
- public void noEnd() throws Exception {
+ public void noStartLong() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long markThreeHoursLater = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ TaxiRide endedThreeHoursLater = endRide(rideStarted,
THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(),
endedThreeHoursLater);
+ TestSink<Long> sink = new TestSink<Long>();
- TestRideSource source = new TestRideSource(rideStarted,
markThreeHoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void longRide() throws Exception {
+ public void endIsMissing() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
- Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
- TaxiRide rideEnded3HoursLater = endRide(rideStarted,
BEGINNING.plusSeconds(180 * 60));
- TestRideSource source =
- new TestRideSource(rideStarted, mark2HoursLater,
rideEnded3HoursLater);
- assertEquals(Collections.singletonList(rideStarted), results(source));
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(new PartitionByRideId(), rideStarted);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(rideStarted.rideId);
}
@Test
- public void startIsDelayedMoreThanTwoHours() throws Exception {
- TaxiRide rideStarted = startRide(1, BEGINNING);
- TaxiRide rideEndedAfter1Hour = endRide(rideStarted,
BEGINNING.plusSeconds(60 * 60));
- Long mark2HoursAfterEnd = BEGINNING.plusSeconds(180 *
60).toEpochMilli();
+ public void endComesAfter3Hours() throws Exception {
+ TaxiRide startOfLongRide = startRide(1, BEGINNING);
+ TaxiRide longRideEndedAfter3Hours = endRide(startOfLongRide,
THREE_HOURS_LATER);
- TestRideSource source =
- new TestRideSource(rideEndedAfter1Hour, mark2HoursAfterEnd,
rideStarted);
- assert (results(source).isEmpty());
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(
+ new PartitionByRideId(), startOfLongRide,
longRideEndedAfter3Hours);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results()).containsExactly(startOfLongRide.rideId);
+ }
+
+ @Test
+ public void multipleRides() throws Exception {
+ TaxiRide startOfOneRide = startRide(1, BEGINNING);
+ TaxiRide otherRide = startRide(2, ONE_MINUTE_LATER);
+ TaxiRide oneRideEnded = endRide(startOfOneRide, THREE_HOURS_LATER);
+ TaxiRide otherRideEnded = endRide(otherRide, THREE_HOURS_LATER);
+
+ ParallelTestSource<TaxiRide> source =
+ new ParallelTestSource<>(
+ new PartitionByRideId(),
+ startOfOneRide,
+ otherRide,
+ oneRideEnded,
+ otherRideEnded);
+ TestSink<Long> sink = new TestSink<Long>();
+
+ longRidesPipeline().execute(source, sink);
+ assertThat(sink.results())
+ .containsExactlyInAnyOrder(startOfOneRide.rideId,
otherRide.rideId);
}
- private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime,
Instant endTime) {
+ // Arranges for all events for a given rideId to be generated by the same
source subtask.
+ private static class PartitionByRideId implements
TestSourcePartitioner<TaxiRide> {
+ @Override
+ public long partition(TaxiRide ride) {
+ return ride.rideId;
+ }
+ }
Review comment:
You're right. This is a vestige of something that I thought I needed,
but then mostly abandoned. I should have removed it, which I've just done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]