This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new c0eb1e3 [FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more
ergonomic
c0eb1e3 is described below
commit c0eb1e3ad3fceda61e470c584319bddccdf660dd
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Wed May 27 22:45:26 2020 +0200
[FLINK-18011] Make WatermarkStrategy/WatermarkStrategies more ergonomic
This removes WatermarkStrategies and instead moves the convenience
entrypoint methods for strategies directly to WatermarmStrategy.
WatermarkStrategy is now also itself the builder for more complex
strategies instead of WatermarkStrategies.
---
docs/dev/connectors/kafka.md | 10 +-
docs/dev/event_timestamp_extractors.md | 17 +-
docs/dev/event_timestamps_watermarks.md | 43 ++--
.../source/reader/CoordinatedSourceITCase.java | 8 +-
.../connectors/kafka/FlinkKafkaConsumerBase.java | 4 +-
.../internals/AbstractFetcherWatermarksTest.java | 4 +-
.../api/common/eventtime/WatermarkStrategies.java | 250 ---------------------
.../api/common/eventtime/WatermarkStrategy.java | 141 +++++++++++-
.../eventtime/WatermarkStrategyWithIdleness.java | 48 ++++
.../WatermarkStrategyWithTimestampAssigner.java | 48 ++++
...ategiesTest.java => WatermarkStrategyTest.java} | 91 +++++---
.../flink/streaming/api/datastream/DataStream.java | 4 +-
.../apache/flink/streaming/api/DataStreamTest.java | 23 ++
.../api/graph/StreamingJobGraphGeneratorTest.java | 6 +-
.../source/SourceOperatorEventTimeTest.java | 25 +--
.../operators/source/TestingSourceOperator.java | 3 +-
.../TimestampsAndWatermarksOperatorTest.java | 22 +-
.../tasks/SourceOperatorStreamTaskTest.java | 4 +-
.../flink/streaming/api/scala/DataStream.scala | 4 +-
.../api/scala/StreamExecutionEnvironmentTest.scala | 5 +-
20 files changed, 392 insertions(+), 368 deletions(-)
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 6f21ff7..27f18f7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -372,9 +372,8 @@ properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(
- WatermarkStrategies.
- .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
- .build());
+ WatermarkStrategy.
+ .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<String> stream = env.addSource(myConsumer);
{% endhighlight %}
@@ -388,9 +387,8 @@ properties.setProperty("group.id", "test")
val myConsumer =
new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(
- WatermarkStrategies.
- .forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
- .build())
+ WatermarkStrategy.
+ .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
val stream = env.addSource(myConsumer)
{% endhighlight %}
diff --git a/docs/dev/event_timestamp_extractors.md
b/docs/dev/event_timestamp_extractors.md
index a80181d..417658c 100644
--- a/docs/dev/event_timestamp_extractors.md
+++ b/docs/dev/event_timestamp_extractors.md
@@ -53,16 +53,12 @@ unioned, connected, or merged.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-WatermarkStrategies
- .<MyType>forMonotonousTimestamps()
- .build();
+WatermarkStrategy.forMonotonousTimestamps();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-WatermarkStrategies
- .forMonotonousTimestamps[MyType]()
- .build()
+WatermarkStrategy.forMonotonousTimestamps()
{% endhighlight %}
</div>
</div>
@@ -89,16 +85,13 @@ about working with late elements.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-WatermarkStrategies
- .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(10))
- .build();
+WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-WatermarkStrategies
- .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(10))
- .build()
+WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(10))
{% endhighlight %}
</div>
</div>
diff --git a/docs/dev/event_timestamps_watermarks.md
b/docs/dev/event_timestamps_watermarks.md
index 58f78c3..3b8d766 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -41,8 +41,11 @@ Timestamp assignment goes hand-in-hand with generating
watermarks, which tell
the system about progress in event time. You can configure this by specifying a
`WatermarkGenerator`.
-The Flink API expects a `WatermarkStrategy` that contains both a
`TimestampAssigner` and `WatermarkGenerator`.
-A number of common strategies out of the box, available in the
`WatermarkStrategies` helper, but users can also build their own strategies
when required.
+The Flink API expects a `WatermarkStrategy` that contains both a
+`TimestampAssigner` and `WatermarkGenerator`. A number of common strategies
+are available out of the box as static methods on `WatermarkStrategy`, but
+users can also build their own strategies when required.
+
Here is the interface for completeness' sake:
{% highlight java %}
@@ -64,26 +67,26 @@ public interface WatermarkStrategy<T> extends
TimestampAssignerSupplier<T>, Wate
{% endhighlight %}
As mentioned, you usually don't implement this interface yourself but use the
-`WatermarkStrategies` helper for using common watermark strategies or to bundle
-together a custom `TimestampAssigner` with a `WatermarkGenerator`. For
example, to use bounded-of-orderness watermarks and a lambda function as a
timestamp assigner you use this:
+static helper methods on `WatermarkStrategy` for common watermark strategies or
+to bundle together a custom `TimestampAssigner` with a `WatermarkGenerator`.
+For example, to use bounded-out-of-orderness watermarks and a lambda function
as a
+timestamp assigner you use this:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-WatermarkStrategies
+WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
- .withTimestampAssigner((event, timestamp) -> event.f0)
- .build();
+ .withTimestampAssigner((event, timestamp) -> event.f0);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-WatermarkStrategies
+WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
override def extractTimestamp(element: (Long, String), recordTimestamp:
Long): Long = element._1
})
- .build()
{% endhighlight %}
(Using Scala Lambdas here currently doesn't work because Scala is stupid and
it's hard to support this. #fus)
@@ -176,23 +179,23 @@ This is a problem because it can happen that some of your
partitions do still
carry events. In that case, the watermark will be held back, because it is
computed as the minimum over all the different parallel watermarks.
-To deal with this, you can use a `WatermarkStrategy` that will detect idleness
and mark an input as idle. `WatermarkStrategies` provides a convenience helper
for this:
+To deal with this, you can use a `WatermarkStrategy` that will detect idleness
+and mark an input as idle. `WatermarkStrategy` provides a convenience helper
+for this:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-WatermarkStrategies
+WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
- .withIdleness(Duration.ofMinutes(1))
- .build();
+ .withIdleness(Duration.ofMinutes(1));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-WatermarkStrategies
+WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1))
- .build()
{% endhighlight %}
</div>
</div>
@@ -434,9 +437,8 @@ case.
{% highlight java %}
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic",
schema, props);
kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategies.
- .<MyType>forBoundedOutOfOrderness(Duration.ofSeconds(20))
- .build());
+ WatermarkStrategy.
+ .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
DataStream<MyType> stream = env.addSource(kafkaSource);
{% endhighlight %}
@@ -445,9 +447,8 @@ DataStream<MyType> stream = env.addSource(kafkaSource);
{% highlight scala %}
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
- WatermarkStrategies
- .forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
- .build())
+ WatermarkStrategy
+ .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
val stream: DataStream[MyType] = env.addSource(kafkaSource)
{% endhighlight %}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
index a7f4c22..6582210 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
@@ -19,7 +19,7 @@
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
@@ -47,7 +47,7 @@ public class CoordinatedSourceITCase extends AbstractTestBase
{
MockBaseSource source = new MockBaseSource(2, 10,
Boundedness.BOUNDED);
DataStream<Integer> stream = env.continuousSource(
source,
-
WatermarkStrategies.<Integer>noWatermarks().build(),
+ WatermarkStrategy.noWatermarks(),
"TestingSource");
executeAndVerify(env, stream, 20);
}
@@ -59,11 +59,11 @@ public class CoordinatedSourceITCase extends
AbstractTestBase {
MockBaseSource source2 = new MockBaseSource(2, 10, 20,
Boundedness.BOUNDED);
DataStream<Integer> stream1 = env.continuousSource(
source1,
-
WatermarkStrategies.<Integer>noWatermarks().build(),
+ WatermarkStrategy.noWatermarks(),
"TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(
source2,
-
WatermarkStrategies.<Integer>noWatermarks().build(),
+ WatermarkStrategy.noWatermarks(),
"TestingSource2");
executeAndVerify(env, stream1.union(stream2), 40);
}
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 0e8c261..67d65fb 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -280,8 +280,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
* strictly ascending per Kafka partition, they will not be strictly
ascending in the resulting
* Flink DataStream, if the parallel source subtask reads more than one
partition.
*
- * <p>Common watermark generation patterns can be found in the
- * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies}
class.
+ * <p>Common watermark generation patterns can be found as static
methods in the
+ * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy}
class.
*
* @return The consumer object, to allow function chaining.
*/
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
index e33ea4a..83a4f68 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherWatermarksTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -72,10 +71,9 @@ public class AbstractFetcherWatermarksTest {
public static Collection<WatermarkStrategy<Long>> getParams() {
return Arrays.asList(
new
AssignerWithPeriodicWatermarksAdapter.Strategy<>(new PeriodicTestExtractor()),
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) ->
new PeriodicTestWatermarkGenerator())
.withTimestampAssigner((event, previousTimestamp) -> event)
- .build()
);
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
deleted file mode 100644
index 880edc7..0000000
---
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.eventtime;
-
-import org.apache.flink.annotation.Public;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * WatermarkStrategies is a simply way to build a {@link WatermarkStrategy} by
configuring
- * common strategies.
- */
-@Public
-public final class WatermarkStrategies<T> {
-
- /**
- * The {@link TimestampAssigner} to use. This can be {@code null} for
cases where records come
- * out of a source with valid timestamps, for example from Kafka.
- */
- @Nullable
- private TimestampAssignerSupplier<T> timestampAssignerSupplier = null;
-
- /** The base strategy for watermark generation. Starting point, is
always set. */
- private final WatermarkStrategy<T> baseStrategy;
-
- /** Optional idle timeout for watermarks. */
- @Nullable
- private Duration idleTimeout;
-
- private WatermarkStrategies(WatermarkStrategy<T> baseStrategy) {
- this.baseStrategy = baseStrategy;
- }
-
- //
------------------------------------------------------------------------
- // builder methods
- //
------------------------------------------------------------------------
-
- /**
- * Add an idle timeout to the watermark strategy.
- * If no records flow in a partition of a stream for that amount of
time, then that partition
- * is considered "idle" and will not hold back the progress of
watermarks in downstream operators.
- *
- * <p>Idleness can be important if some partitions have little data and
might not have events during
- * some periods. Without idleness, these streams can stall the overall
event time progress of the
- * application.
- */
- public WatermarkStrategies<T> withIdleness(Duration idleTimeout) {
- checkNotNull(idleTimeout, "idleTimeout");
- checkArgument(!(idleTimeout.isZero() ||
idleTimeout.isNegative()), "idleTimeout must be greater than zero");
- this.idleTimeout = idleTimeout;
- return this;
- }
-
- /**
- * Adds the given {@link TimestampAssigner} (via a {@link
TimestampAssignerSupplier}) to this
- * {@link WatermarkStrategies}.
- *
- * <p>You can use this when a {@link TimestampAssigner} needs
additional context, for example
- * access to the metrics system.
- *
- * <pre>
- * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
- * .forMonotonousTimestamps()
- * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx))
- * .build();
- * }</pre>
- */
- public WatermarkStrategies<T>
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
- checkNotNull(timestampAssigner, "timestampAssigner");
- this.timestampAssignerSupplier = timestampAssigner;
- return this;
- }
-
- /**
- * Adds the given {@link TimestampAssigner} to this {@link
WatermarkStrategies}.
- *
- * <p>You can use this in case you want to specify a {@link
TimestampAssigner} via a lambda
- * function.
- *
- * <pre>
- * {@code WatermarkStrategy<CustomObject> wmStrategy =
WatermarkStrategies
- * .<CustomObject>forMonotonousTimestamps()
- * .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
- * .build();
- * }</pre>
- */
- public WatermarkStrategies<T>
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
- checkNotNull(timestampAssigner, "timestampAssigner");
- this.timestampAssignerSupplier =
TimestampAssignerSupplier.of(timestampAssigner);
- return this;
- }
-
- /**
- * Build the watermark strategy.
- */
- public WatermarkStrategy<T> build() {
- WatermarkStrategy<T> strategy = this.baseStrategy;
-
- if (idleTimeout != null) {
- strategy = new WithIdlenessStrategy<>(strategy,
idleTimeout);
- }
-
- if (timestampAssignerSupplier != null) {
- strategy = new WithTimestampAssigner<>(strategy,
timestampAssignerSupplier);
- }
-
- return strategy;
- }
-
- //
------------------------------------------------------------------------
- // builder entry points
- //
------------------------------------------------------------------------
-
- /**
- * Starts building a watermark strategy for situations with
monotonously ascending
- * timestamps.
- *
- * <p>The watermarks are generated periodically and tightly follow the
latest
- * timestamp in the data. The delay introduced by this strategy is
mainly the periodic
- * interval in which the watermarks are generated.
- *
- * @see AscendingTimestampsWatermarks
- */
- public static <T> WatermarkStrategies<T> forMonotonousTimestamps() {
- return new WatermarkStrategies<>((ctx) -> new
AscendingTimestampsWatermarks<>());
- }
-
- /**
- * Starts building a watermark strategy for situations where records
are out of order, but
- * you can place an upper bound on how far the events are out of order.
- * An out-of-order bound B means that once the an event with timestamp
T was encountered, no
- * events older than {@code T - B} will follow any more.
- *
- * <p>The watermarks are generated periodically. The delay introduced
by this watermark strategy
- * is the periodic interval length, plus the out of orderness bound.
- *
- * @see BoundedOutOfOrdernessWatermarks
- */
- public static <T> WatermarkStrategies<T>
forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
- return new WatermarkStrategies<>((ctx) -> new
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness));
- }
-
- /**
- * Starts building a watermark strategy based on an existing {@code
WatermarkStrategy}.
- */
- public static <T> WatermarkStrategies<T>
forStrategy(WatermarkStrategy<T> strategy) {
- return new WatermarkStrategies<>(strategy);
- }
-
- /**
- * Starts building a watermark strategy based on an existing {@link
WatermarkGeneratorSupplier}.
- */
- public static <T> WatermarkStrategies<T>
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
- return new WatermarkStrategies<>(new
FromWatermarkGeneratorSupplier<>(generatorSupplier));
- }
-
- /**
- * Starts building a watermark strategy that generates no watermarks at
all.
- * This may be useful in scenarios that do pure processing-time based
stream processing.
- */
- public static <T> WatermarkStrategies<T> noWatermarks() {
- return new WatermarkStrategies<>((ctx) -> new
NoWatermarksGenerator<>());
- }
-
- //
------------------------------------------------------------------------
-
- private static final class FromWatermarkGeneratorSupplier<T> implements
WatermarkStrategy<T> {
- private static final long serialVersionUID = 1L;
-
- private final WatermarkGeneratorSupplier<T> generatorSupplier;
-
- private
FromWatermarkGeneratorSupplier(WatermarkGeneratorSupplier<T> generatorSupplier)
{
- this.generatorSupplier = generatorSupplier;
- }
-
- @Override
- public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return
generatorSupplier.createWatermarkGenerator(context);
- }
- }
-
- /**
- * A {@link WatermarkStrategy} that overrides the {@link
TimestampAssigner} of the given base
- * {@link WatermarkStrategy}.
- */
- private static final class WithTimestampAssigner<T> implements
WatermarkStrategy<T> {
- private static final long serialVersionUID = 1L;
-
- private final WatermarkStrategy<T> baseStrategy;
- private final TimestampAssignerSupplier<T> timestampAssigner;
-
- private WithTimestampAssigner(WatermarkStrategy<T>
baseStrategy, TimestampAssignerSupplier<T> timestampAssigner) {
- this.baseStrategy = baseStrategy;
- this.timestampAssigner = timestampAssigner;
- }
-
- @Override
- public TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
- return
timestampAssigner.createTimestampAssigner(context);
- }
-
- @Override
- public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return baseStrategy.createWatermarkGenerator(context);
- }
- }
-
- private static final class WithIdlenessStrategy<T> implements
WatermarkStrategy<T> {
- private static final long serialVersionUID = 1L;
-
- private final WatermarkStrategy<T> baseStrategy;
- private final Duration idlenessTimeout;
-
- private WithIdlenessStrategy(WatermarkStrategy<T> baseStrategy,
Duration idlenessTimeout) {
- this.baseStrategy = baseStrategy;
- this.idlenessTimeout = idlenessTimeout;
- }
-
- @Override
- public TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
- return baseStrategy.createTimestampAssigner(context);
- }
-
- @Override
- public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
- return new
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
idlenessTimeout);
- }
- }
-}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
index 254afc1..cf11e47 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java
@@ -21,17 +21,48 @@ package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
+import java.time.Duration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The WatermarkStrategy defines how to generate {@link Watermark}s in the
stream sources. The
* WatermarkStrategy is a builder/factory for the {@link WatermarkGenerator}
that generates the
* watermarks and the {@link TimestampAssigner} which assigns the internal
timestamp of a record.
*
+ * <p>This interface is split into three parts: 1) methods that an implementor
of this interface
+ * needs to implement, 2) builder methods for building a {@code
WatermarkStrategy} on a base
+ * strategy, 3) convenience methods for constructing a {code
WatermarkStrategy} for common built-in
+ * strategies or based on a {@link WatermarkGeneratorSupplier}
+ *
+ * <p>Implementors of this interface need only implement {@link
#createWatermarkGenerator(WatermarkGeneratorSupplier.Context)}.
+ * Optionally, you can implement {@link
#createTimestampAssigner(TimestampAssignerSupplier.Context)}.
+ *
+ * <p>The builder methods, like {@link #withIdleness(Duration)} or {@link
+ * #createTimestampAssigner(TimestampAssignerSupplier.Context)} create a new
{@code
+ * WatermarkStrategy} that wraps and enriches a base strategy. The strategy on
which the method is
+ * called is the base strategy.
+ *
+ * <p>The convenience methods, for example {@link
#forBoundedOutOfOrderness(Duration)}, create a
+ * {@code WatermarkStrategy} for common built in strategies.
+ *
* <p>This interface is {@link Serializable} because watermark strategies may
be shipped
* to workers during distributed execution.
*/
@Public
-public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T> {
+public interface WatermarkStrategy<T> extends
+ TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
+
+ //
------------------------------------------------------------------------
+ // Methods that implementors need to implement.
+ //
------------------------------------------------------------------------
+
+ /**
+ * Instantiates a WatermarkGenerator that generates watermarks
according to this strategy.
+ */
+ @Override
+ WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps
according to this
@@ -44,9 +75,111 @@ public interface WatermarkStrategy<T> extends
TimestampAssignerSupplier<T>, Wate
return new RecordTimestampAssigner<>();
}
+ //
------------------------------------------------------------------------
+ // Builder methods for enriching a base WatermarkStrategy
+ //
------------------------------------------------------------------------
+
/**
- * Instantiates a WatermarkGenerator that generates watermarks
according to this strategy.
+ * Creates a new {@code WatermarkStrategy} that wraps this strategy but
instead uses the given
+ * {@link TimestampAssigner} (via a {@link TimestampAssignerSupplier}).
+ *
+ * <p>You can use this when a {@link TimestampAssigner} needs
additional context, for example
+ * access to the metrics system.
+ *
+ * <pre>
+ * {@code WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+ * .forMonotonousTimestamps()
+ * .withTimestampAssigner((ctx) -> new MetricsReportingAssigner(ctx));
+ * }</pre>
*/
- @Override
- WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
+ default WatermarkStrategy<T>
withTimestampAssigner(TimestampAssignerSupplier<T> timestampAssigner) {
+ checkNotNull(timestampAssigner, "timestampAssigner");
+ return new WatermarkStrategyWithTimestampAssigner<>(this,
timestampAssigner);
+ }
+
+ /**
+ * Creates a new {@code WatermarkStrategy} that wraps this strategy but
instead uses the given
+ * {@link SerializableTimestampAssigner}.
+ *
+ * <p>You can use this in case you want to specify a {@link
TimestampAssigner} via a lambda
+ * function.
+ *
+ * <pre>
+ * {@code WatermarkStrategy<CustomObject> wmStrategy = WatermarkStrategy
+ * .forMonotonousTimestamps()
+ * .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
+ * }</pre>
+ */
+ default WatermarkStrategy<T>
withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) {
+ checkNotNull(timestampAssigner, "timestampAssigner");
+ return new WatermarkStrategyWithTimestampAssigner<>(this,
+
TimestampAssignerSupplier.of(timestampAssigner));
+ }
+
+ /**
+ * Creates a new enriched {@link WatermarkStrategy} that also does
idleness detection in the
+ * created {@link WatermarkGenerator}.
+ *
+ * <p>Add an idle timeout to the watermark strategy. If no records flow
in a partition of a
+ * stream for that amount of time, then that partition is considered
"idle" and will not hold
+ * back the progress of watermarks in downstream operators.
+ *
+ * <p>Idleness can be important if some partitions have little data and
might not have events
+ * during some periods. Without idleness, these streams can stall the
overall event time
+ * progress of the application.
+ */
+ default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
+ checkNotNull(idleTimeout, "idleTimeout");
+ checkArgument(!(idleTimeout.isZero() ||
idleTimeout.isNegative()),
+ "idleTimeout must be greater than zero");
+ return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
+ }
+
+ //
------------------------------------------------------------------------
+ // Convenience methods for common watermark strategies
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a watermark strategy for situations with monotonously
ascending timestamps.
+ *
+ * <p>The watermarks are generated periodically and tightly follow the
latest
+ * timestamp in the data. The delay introduced by this strategy is
mainly the periodic interval
+ * in which the watermarks are generated.
+ *
+ * @see AscendingTimestampsWatermarks
+ */
+ static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
+ return (ctx) -> new AscendingTimestampsWatermarks<>();
+ }
+
+ /**
+ * Creates a watermark strategy for situations where records are out of
order, but you can place
+ * an upper bound on how far the events are out of order. An
out-of-order bound B means that
+ * once the an event with timestamp T was encountered, no events older
than {@code T - B} will
+ * follow any more.
+ *
+ * <p>The watermarks are generated periodically. The delay introduced
by this watermark
+ * strategy is the periodic interval length, plus the out of orderness
bound.
+ *
+ * @see BoundedOutOfOrdernessWatermarks
+ */
+ static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration
maxOutOfOrderness) {
+ return (ctx) -> new
BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
+ }
+
+ /**
+ * Creates a watermark strategy based on an existing {@link
WatermarkGeneratorSupplier}.
+ */
+ static <T> WatermarkStrategy<T>
forGenerator(WatermarkGeneratorSupplier<T> generatorSupplier) {
+ return generatorSupplier::createWatermarkGenerator;
+ }
+
+ /**
+ * Creates a watermark strategy that generates no watermarks at all.
This may be useful in
+ * scenarios that do pure processing-time based stream processing.
+ */
+ static <T> WatermarkStrategy<T> noWatermarks() {
+ return (ctx) -> new NoWatermarksGenerator<>();
+ }
+
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
new file mode 100644
index 0000000..98c3374
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithIdleness.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import java.time.Duration;
+
+/**
+ * A {@link WatermarkStrategy} that adds idleness detection on top of the
wrapped strategy.
+ */
+final class WatermarkStrategyWithIdleness<T> implements WatermarkStrategy<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final WatermarkStrategy<T> baseStrategy;
+ private final Duration idlenessTimeout;
+
+ WatermarkStrategyWithIdleness(WatermarkStrategy<T> baseStrategy,
Duration idlenessTimeout) {
+ this.baseStrategy = baseStrategy;
+ this.idlenessTimeout = idlenessTimeout;
+ }
+
+ @Override
+ public TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+ return baseStrategy.createTimestampAssigner(context);
+ }
+
+ @Override
+ public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+ return new
WatermarksWithIdleness<>(baseStrategy.createWatermarkGenerator(context),
+ idlenessTimeout);
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
new file mode 100644
index 0000000..f83779a
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategyWithTimestampAssigner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+/**
+ * A {@link WatermarkStrategy} that overrides the {@link TimestampAssigner} of
the given base {@link
+ * WatermarkStrategy}.
+ */
+final class WatermarkStrategyWithTimestampAssigner<T> implements
WatermarkStrategy<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final WatermarkStrategy<T> baseStrategy;
+ private final TimestampAssignerSupplier<T> timestampAssigner;
+
+ WatermarkStrategyWithTimestampAssigner(
+ WatermarkStrategy<T> baseStrategy,
+ TimestampAssignerSupplier<T> timestampAssigner) {
+ this.baseStrategy = baseStrategy;
+ this.timestampAssigner = timestampAssigner;
+ }
+
+ @Override
+ public TimestampAssigner<T>
createTimestampAssigner(TimestampAssignerSupplier.Context context) {
+ return timestampAssigner.createTimestampAssigner(context);
+ }
+
+ @Override
+ public WatermarkGenerator<T>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
+ return baseStrategy.createWatermarkGenerator(context);
+ }
+}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
similarity index 72%
rename from
flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
rename to
flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
index 6136bb7..857b994 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategiesTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/eventtime/WatermarkStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.junit.Test;
import java.io.Serializable;
+import java.time.Duration;
import java.util.Map;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -37,97 +38,115 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
/**
- * Test for the {@link WatermarkStrategies} class.
+ * Test for the {@link WatermarkStrategy} class.
*/
-public class WatermarkStrategiesTest {
+public class WatermarkStrategyTest {
@Test
public void testDefaultTimeStampAssigner() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
- .forMonotonousTimestamps()
- .build();
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
+ .forMonotonousTimestamps();
+
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
-
assertThat(wmStrategy.createTimestampAssigner(assignerContext()),
instanceOf(RecordTimestampAssigner.class));
+
assertThat(wmStrategy.createTimestampAssigner(assignerContext()),
+ instanceOf(RecordTimestampAssigner.class));
}
@Test
public void testLambdaTimestampAssigner() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
- .withTimestampAssigner((event, timestamp) ->
42L)
- .build();
+ .withTimestampAssigner((event, timestamp) ->
42L);
+
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- TimestampAssigner<Object> timestampAssigner =
wmStrategy.createTimestampAssigner(assignerContext());
+ TimestampAssigner<Object> timestampAssigner = wmStrategy
+ .createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L),
is(42L));
}
@Test
public void testLambdaTimestampAssignerSupplier() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
-
.withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L))
- .build();
+
.withTimestampAssigner(TimestampAssignerSupplier.of((event, timestamp) -> 42L));
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- TimestampAssigner<Object> timestampAssigner =
wmStrategy.createTimestampAssigner(assignerContext());
+ TimestampAssigner<Object> timestampAssigner = wmStrategy
+ .createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L),
is(42L));
}
@Test
public void testAnonymousInnerTimestampAssigner() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner(new
SerializableTimestampAssigner<Object>() {
@Override
public long extractTimestamp(Object
element, long recordTimestamp) {
return 42;
}
- })
- .build();
+ });
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- TimestampAssigner<Object> timestampAssigner =
wmStrategy.createTimestampAssigner(assignerContext());
+ TimestampAssigner<Object> timestampAssigner = wmStrategy
+ .createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L),
is(42L));
}
@Test
public void testClassTimestampAssigner() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
- .withTimestampAssigner((ctx) -> new
TestTimestampAssigner())
- .build();
+ .withTimestampAssigner((ctx) -> new
TestTimestampAssigner());
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- TimestampAssigner<Object> timestampAssigner =
wmStrategy.createTimestampAssigner(assignerContext());
+ TimestampAssigner<Object> timestampAssigner = wmStrategy
+ .createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L),
is(42L));
}
@Test
public void testClassTimestampAssignerUsingSupplier() {
- WatermarkStrategy<Object> wmStrategy = WatermarkStrategies
+ WatermarkStrategy<Object> wmStrategy = WatermarkStrategy
.forMonotonousTimestamps()
- .withTimestampAssigner((context) -> new
TestTimestampAssigner())
- .build();
+ .withTimestampAssigner((context) -> new
TestTimestampAssigner());
// ensure that the closure can be cleaned through the
WatermarkStategies
ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
- TimestampAssigner<Object> timestampAssigner =
wmStrategy.createTimestampAssigner(assignerContext());
+ TimestampAssigner<Object> timestampAssigner = wmStrategy
+ .createTimestampAssigner(assignerContext());
assertThat(timestampAssigner.extractTimestamp(null, 13L),
is(42L));
}
+ @Test
+ public void testWithIdlenessHelper() {
+ WatermarkStrategy<String> wmStrategy = WatermarkStrategy
+ .<String>forMonotonousTimestamps()
+ .withIdleness(Duration.ofDays(7));
+
+ // ensure that the closure can be cleaned
+ ClosureCleaner.clean(wmStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+
+
assertThat(wmStrategy.createTimestampAssigner(assignerContext()),
+ instanceOf(RecordTimestampAssigner.class));
+
assertThat(wmStrategy.createWatermarkGenerator(generatorContext()),
+ instanceOf(WatermarksWithIdleness.class));
+ }
+
static class TestTimestampAssigner implements
TimestampAssigner<Object>, Serializable {
+
@Override
public long extractTimestamp(Object element, long
recordTimestamp) {
return 42L;
@@ -135,13 +154,29 @@ public class WatermarkStrategiesTest {
}
static TimestampAssignerSupplier.Context assignerContext() {
- return DummyMetricGroup::new;
+ return new TimestampAssignerSupplier.Context() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new DummyMetricGroup();
+ }
+ };
+ }
+
+ static WatermarkGeneratorSupplier.Context generatorContext() {
+ return new WatermarkGeneratorSupplier.Context() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new DummyMetricGroup();
+ }
+ };
}
/**
- * A dummy {@link MetricGroup} to be used when a group is required as
an argument but not actually used.
+ * A dummy {@link MetricGroup} to be used when a group is required as
an argument but not
+ * actually used.
*/
public static class DummyMetricGroup implements MetricGroup {
+
@Override
public Counter counter(int name) {
return null;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 8911f45..5a54ea2 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -888,8 +888,8 @@ public class DataStream<T> {
* <p>Periodically (defined by the {@link
ExecutionConfig#getAutoWatermarkInterval()}), the
* {@link WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method
will be called.
*
- * <p>Common watermark generation patterns can be found in the
- * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies}
class.
+ * <p>Common watermark generation patterns can be found as static
methods in the
+ * {@link org.apache.flink.api.common.eventtime.WatermarkStrategy}
class.
*
* @param watermarkStrategy The strategy to generate watermarks based
on event timestamps.
* @return The stream after the transformation, with assigned
timestamps and watermarks.
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index a174a6e..788385b 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api;
import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
@@ -92,6 +93,7 @@ import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -110,6 +112,27 @@ public class DataStreamTest extends TestLogger {
public ExpectedException expectedException = ExpectedException.none();
/**
+ * Ensure that WatermarkStrategy is easy to use in the API, without
superfluous generics.
+ */
+ @Test
+ public void testErgonomicWatermarkStrategy() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> input = env.fromElements("bonjour");
+
+ // as soon as you have a chain of methods the first call needs
a generic
+ input.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+
.forBoundedOutOfOrderness(Duration.ofMillis(10)));
+
+ // as soon as you have a chain of methods the first call needs
to specify the generic type
+ input.assignTimestampsAndWatermarks(
+ WatermarkStrategy
+
.<String>forBoundedOutOfOrderness(Duration.ofMillis(10))
+ .withTimestampAssigner((event,
timestamp) -> 42L));
+ }
+
+ /**
* Tests union functionality. This ensures that self-unions and unions
of streams
* with differing parallelism work.
*
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index fec7b0d..eca6883 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -253,7 +253,7 @@ public class StreamingJobGraphGeneratorTest extends
TestLogger {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = env.continuousSource(
new MockSource(Boundedness.BOUNDED, 1),
-
WatermarkStrategies.<Integer>noWatermarks().build(),
+ WatermarkStrategy.noWatermarks(),
"TestingSource");
OneInputTransformation<Integer, Integer> resultTransform = new
OneInputTransformation<Integer, Integer>(
@@ -463,7 +463,7 @@ public class StreamingJobGraphGeneratorTest extends
TestLogger {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.continuousSource(
new MockSource(Boundedness.BOUNDED, 1),
-
WatermarkStrategies.<Integer>noWatermarks().build(),
+ WatermarkStrategy.noWatermarks(),
"TestSource");
source.addSink(new DiscardingSink<>());
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 347975c..8d05336 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.source;
import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.connector.source.ReaderOutput;
@@ -57,9 +56,9 @@ public class SourceOperatorEventTimeTest {
@Test
public void testMainOutputPeriodicWatermarks() throws Exception {
- final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategies
- .<Integer>forGenerator((ctx) -> new
OnPeriodicTestWatermarkGenerator<>())
- .build();
+ final WatermarkStrategy<Integer> watermarkStrategy =
+ WatermarkStrategy
+ .forGenerator((ctx) -> new
OnPeriodicTestWatermarkGenerator<>());
final List<Watermark> result =
testSequenceOfWatermarks(watermarkStrategy,
(output) -> output.collect(0, 100L),
@@ -75,9 +74,9 @@ public class SourceOperatorEventTimeTest {
@Test
public void testMainOutputEventWatermarks() throws Exception {
- final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategies
- .<Integer>forGenerator((ctx) -> new
OnEventTestWatermarkGenerator<>())
- .build();
+ final WatermarkStrategy<Integer> watermarkStrategy =
+ WatermarkStrategy
+ .forGenerator((ctx) -> new
OnEventTestWatermarkGenerator<>());
final List<Watermark> result =
testSequenceOfWatermarks(watermarkStrategy,
(output) -> output.collect(0, 100L),
@@ -93,9 +92,9 @@ public class SourceOperatorEventTimeTest {
@Test
public void testPerSplitOutputPeriodicWatermarks() throws Exception {
- final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategies
- .<Integer>forGenerator((ctx) -> new
OnPeriodicTestWatermarkGenerator<>())
- .build();
+ final WatermarkStrategy<Integer> watermarkStrategy =
+ WatermarkStrategy
+ .forGenerator((ctx) -> new
OnPeriodicTestWatermarkGenerator<>());
final List<Watermark> result =
testSequenceOfWatermarks(watermarkStrategy,
(output) -> {
@@ -118,9 +117,9 @@ public class SourceOperatorEventTimeTest {
@Test
public void testPerSplitOutputEventWatermarks() throws Exception {
- final WatermarkStrategy<Integer> watermarkStrategy =
WatermarkStrategies
- .<Integer>forGenerator((ctx) -> new
OnEventTestWatermarkGenerator<>())
- .build();
+ final WatermarkStrategy<Integer> watermarkStrategy =
+ WatermarkStrategy
+ .forGenerator((ctx) -> new
OnEventTestWatermarkGenerator<>());
final List<Watermark> result =
testSequenceOfWatermarks(watermarkStrategy,
(output) -> {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 43992dc..038a211 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.source;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
@@ -56,7 +55,7 @@ public class TestingSourceOperator<T> extends
SourceOperator<T, MockSourceSplit
OperatorEventGateway eventGateway,
int subtaskIndex) {
- this(reader, WatermarkStrategies.<T>noWatermarks().build(), new
TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
+ this(reader, WatermarkStrategy.noWatermarks(), new
TestProcessingTimeService(), eventGateway, subtaskIndex, 5);
}
public TestingSourceOperator(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 4c149ba..995a8ca 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -22,7 +22,7 @@ import
org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -47,7 +47,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void inputWatermarksAreNotForwarded() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PeriodicWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -60,7 +60,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void longMaxInputWatermarkIsForwarded() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PeriodicWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -72,7 +72,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void periodicWatermarksEmitOnPeriodicEmit() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PeriodicWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -92,7 +92,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void periodicWatermarksOnlyEmitOnPeriodicEmit() throws Exception
{
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PeriodicWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -105,7 +105,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void periodicWatermarksDoNotRegress() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PeriodicWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -125,7 +125,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void punctuatedWatermarksEmitImmediately() throws Exception {
OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>,
Tuple2<Boolean, Long>> testHarness = createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PunctuatedWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new TupleExtractor()));
@@ -143,7 +143,7 @@ public class TimestampsAndWatermarksOperatorTest {
@Test
public void punctuatedWatermarksDoNotRegress() throws Exception {
OneInputStreamOperatorTestHarness<Tuple2<Boolean, Long>,
Tuple2<Boolean, Long>> testHarness = createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
PunctuatedWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new TupleExtractor()));
@@ -165,7 +165,7 @@ public class TimestampsAndWatermarksOperatorTest {
public void testNegativeTimestamps() throws Exception {
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
createTestHarness(
- WatermarkStrategies
+ WatermarkStrategy
.forGenerator((ctx) -> new
NeverWatermarkGenerator())
.withTimestampAssigner((ctx) ->
new LongExtractor()));
@@ -181,10 +181,10 @@ public class TimestampsAndWatermarksOperatorTest {
}
private static <T> OneInputStreamOperatorTestHarness<T, T>
createTestHarness(
- WatermarkStrategies<T> watermarkStrategy) throws
Exception {
+ WatermarkStrategy<T> watermarkStrategy) throws
Exception {
final TimestampsAndWatermarksOperator<T> operator =
- new
TimestampsAndWatermarksOperator<>(watermarkStrategy.build());
+ new
TimestampsAndWatermarksOperator<>(watermarkStrategy);
OneInputStreamOperatorTestHarness<T, T> testHarness =
new
OneInputStreamOperatorTestHarness<>(operator);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 515bd51..0f2e99d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.WatermarkStrategies;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
@@ -132,7 +132,7 @@ public class SourceOperatorStreamTaskTest {
// get a source operator.
SourceOperatorFactory<Integer> sourceOperatorFactory = new
SourceOperatorFactory<>(
new MockSource(Boundedness.BOUNDED, 1),
-
WatermarkStrategies.<Integer>noWatermarks().build());
+ WatermarkStrategy.noWatermarks());
// build a test harness.
MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
diff --git
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index fe2184e..0608c53 100644
---
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -828,8 +828,8 @@ class DataStream[T](stream: JavaStream[T]) {
* Periodically (defined by the
[[ExecutionConfig#getAutoWatermarkInterval()]]), the
* [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be
called.
*
- * Common watermark generation patterns can be found in the
- * [[org.apache.flink.api.common.eventtime.WatermarkStrategies]] class.
+ * Common watermark generation patterns can be found as static methods in the
+ * [[org.apache.flink.api.common.eventtime.WatermarkStrategy]] class.
*/
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]):
DataStream[T] = {
val cleanedStrategy = clean(watermarkStrategy)
diff --git
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
index f493497..fa503e0 100644
---
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
+++
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironmentTest.scala
@@ -18,12 +18,11 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.api.common.eventtime.WatermarkStrategies
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.api.connector.source.mocks.MockSource
import org.apache.flink.api.java.typeutils.GenericTypeInfo
-
import org.junit.Assert.assertEquals
import org.junit.Test
@@ -43,7 +42,7 @@ class StreamExecutionEnvironmentTest {
val stream = env.continuousSource(
new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1),
- WatermarkStrategies.noWatermarks[Integer]().build(),
+ WatermarkStrategy.noWatermarks(),
"test source")
assertEquals(typeInfo, stream.dataType)