twalthr commented on a change in pull request #12412:
URL: https://github.com/apache/flink/pull/12412#discussion_r433839951
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
##########
@@ -49,4 +53,158 @@
*/
@Override
WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+
+ /**
+ * Creates a watermark strategy for situations with monotonously
ascending timestamps.
+ *
+ * <p>The watermarks are generated periodically and tightly follow the
latest
+ * timestamp in the data. The delay introduced by this strategy is
mainly the periodic interval
+ * in which the watermarks are generated.
+ *
+ * @see AscendingTimestampsWatermarks
+ */
+ static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+ return (ctx) -> new AscendingTimestampsWatermarks<>();
+ }
+
+ /**
+ * Creates a watermark strategy for situations where records are out of
order, but you
+ * can place an upper bound on how far the events are out of order. An
out-of-order bound B
+ * means that once the an event with timestamp T was encountered, no
events older than {@code T
+ * - B} will follow any more.
+ *
+ * <p>The watermarks are generated periodically. The delay introduced
by this watermark
+ * strategy
+ * is the periodic interval length, plus the out of orderness bound.
+ *
+ * @see BoundedOutOfOrdernessWatermarks
+ */
+ static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration
maxOutOfOrderness) {
+ return (ctx) -> new
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+ }
+
+ /**
+ * Creates a watermark strategy based on an existing {@link
+ * WatermarkGeneratorSupplier}.
+ */
+ static <T> WatermarkStrategy<T>
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+ return generatorSupplier::createWatermarkGenerator;
+ }
+
+ /**
+ * Creates a watermark strategy that generates no watermarks at all.
+ * This may be useful in scenarios that do pure processing-time based
stream processing.
+ */
+ static <T> WatermarkStrategy<T> noWatermarks() {
+ return (ctx) -> new NoWatermarksGenerator<>();
+ }
+
+ /**
+ * Creates Adds the given {@link TimestampAssigner} (via a {@link
TimestampAssignerSupplier}) to
+ * this {@link WatermarkStrategy}.
+ *
+ * <p>You can use this when a {@link TimestampAssigner} needs
additional context, for example
+ * access to the metrics system.
+ *
+ * <pre>
+ * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+ * .forMonotonousTimestamps()
+ * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+ * }</pre>
+ */
+ default WatermarkStrategy<T>
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+ checkNotNull(timestampAssigner, "timestampAssigner");
+ return new WithTimestampAssigner<>(this, timestampAssigner);
+ }
+
+ /**
+ * Creates a new {@link WatermarkStrategy} with the {@link
TimestampAssigner} overridden by the
+ * provided assigner.
+ *
+ * <p>You can use this in case you want to specify a {@link
TimestampAssigner} via a lambda
+ * function.
+ *
+ * <pre>
+ * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+ * .forMonotonousTimestamps()
+ * .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+ * }</pre>
+ */
+ default WatermarkStrategy<T>
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+ checkNotNull(timestampAssigner, "timestampAssigner");
+ return new WithTimestampAssigner<>(this,
TimestampAssignerSupplier.of(timestampAssigner));
+ }
+
+ /**
+ * Add an idle timeout to the watermark strategy. If no records flow in
a partition of a stream
+ * for that amount of time, then that partition is considered "idle"
and will not hold back the
+ * progress of watermarks in downstream operators.
+ *
+ * <p>Idleness can be important if some partitions have little data and
might not have events
+ * during
+ * some periods. Without idleness, these streams can stall the overall
event time progress of
+ * the application.
+ */
+ default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+ checkNotNull(idleTimeout, "idleTimeout");
+ checkArgument(!(idleTimeout.isZero() ||
idleTimeout.isNegative()),
+ "idleTimeout must be greater than zero");
+ return new WithIdlenessStrategy<>(this, idleTimeout);
+ }
+
+ /**
+ * A {@link WatermarkStrategy} that adds idleness detection on top of
the wrapped strategy.
+ */
+ class WithIdlenessStrategy<T> implements WatermarkStrategy<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final WatermarkStrategy<T> baseStrategy;
+ private final Duration idlenessTimeout;
+
+ private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy,
Duration idlenessTimeout) {
+ this.baseStrategy = baseStrategy;
+ this.idlenessTimeout = idlenessTimeout;
+ }
+
+ @Override
+ public TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+ return baseStrategy.createTimestampAssigner(context);
+ }
+
+ @Override
+ public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+ return new
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+ idlenessTimeout);
+ }
Review comment:
nit: we should add hashCode/equals here, otherwise it is difficult to
compare strategies for e.g. for testing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]