This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git
The following commit(s) were added to refs/heads/master by this push:
new bfcd25a [FLINK-24118] Allow TaxiFareGenerator to produce bounded
streams (#40)
bfcd25a is described below
commit bfcd25a9d52e71f018720ea4865090b5bab5b135
Author: David Anderson <[email protected]>
AuthorDate: Fri Sep 3 08:36:41 2021 -0600
[FLINK-24118] Allow TaxiFareGenerator to produce bounded streams (#40)
---
.../exercises/common/sources/TaxiFareGenerator.java | 19 ++++++++++++++++++-
.../exercises/common/utils/DataGenerator.java | 4 ++--
.../training/exercises/hourlytips/HourlyTipsTest.java | 8 ++++----
3 files changed, 24 insertions(+), 7 deletions(-)
diff --git
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
index 0866e0d..58fbe68 100644
---
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
+++
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
@@ -20,6 +20,10 @@ package org.apache.flink.training.exercises.common.sources;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
+
+import java.time.Duration;
+import java.time.Instant;
/**
* This SourceFunction generates a data stream of TaxiFare records.
@@ -29,6 +33,14 @@ import
org.apache.flink.training.exercises.common.datatypes.TaxiFare;
public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
private volatile boolean running = true;
+ private Instant limitingTimestamp = Instant.MAX;
+
+ /** Create a bounded TaxiFareGenerator that runs only for the specified
duration. */
+ public static TaxiFareGenerator runFor(Duration duration) {
+ TaxiFareGenerator generator = new TaxiFareGenerator();
+ generator.limitingTimestamp = DataGenerator.BEGINNING.plus(duration);
+ return generator;
+ }
@Override
public void run(SourceContext<TaxiFare> ctx) throws Exception {
@@ -37,8 +49,13 @@ public class TaxiFareGenerator implements
SourceFunction<TaxiFare> {
while (running) {
TaxiFare fare = new TaxiFare(id);
- id += 1;
+ // don't emit events that exceed the specified limit
+ if (fare.startTime.compareTo(limitingTimestamp) >= 0) {
+ break;
+ }
+
+ ++id;
ctx.collect(fare);
// match our event production rate to that of the TaxiRideGenerator
diff --git
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
index 10c4c2b..078c43a 100644
---
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
+++
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
@@ -32,7 +32,7 @@ public class DataGenerator {
private static final int SECONDS_BETWEEN_RIDES = 20;
private static final int NUMBER_OF_DRIVERS = 200;
- private static final Instant beginTime =
Instant.parse("2020-01-01T12:00:00.00Z");
+ public static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
private transient long rideId;
@@ -43,7 +43,7 @@ public class DataGenerator {
/** Deterministically generates and returns the startTime for this ride. */
public Instant startTime() {
- return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
+ return BEGINNING.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
}
/** Deterministically generates and returns the endTime for this ride. */
diff --git
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 8167eba..1e379c3 100644
---
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
import org.apache.flink.training.exercises.testing.ComposedPipeline;
import org.apache.flink.training.exercises.testing.ExecutablePipeline;
import org.apache.flink.training.exercises.testing.ParallelTestSource;
@@ -33,6 +34,7 @@ import
org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution;
import org.junit.ClassRule;
import org.junit.Test;
+import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -105,10 +107,8 @@ public class HourlyTipsTest {
assertThat(results(source)).containsExactlyInAnyOrder(hour1, hour2);
}
- private static final Instant BEGINNING =
Instant.parse("2020-01-01T12:00:00.00Z");
-
- private Instant t(int minutes) {
- return BEGINNING.plusSeconds(60L * minutes);
+ public Instant t(int minutes) {
+ return DataGenerator.BEGINNING.plus(Duration.ofMinutes(minutes));
}
private TaxiFare testFare(long driverId, Instant startTime, float tip) {