Dario Heinisch created FLINK-24623:
--------------------------------------
Summary: Prevent usage of EventTimeWindows when EventTime is
disabled
Key: FLINK-24623
URL: https://issues.apache.org/jira/browse/FLINK-24623
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Reporter: Dario Heinisch
Having the following stream will never process values after the windowing as
event time based has been disabled via the Watermark strategy:
{code:java}
public class PlaygroundJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration()); DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
@Override
public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
int i = 0;
while (true) {
Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
sourceContext.collect(tuple);
}
} @Override
public void cancel() {
} }); source.assignTimestampsAndWatermarks(
// Switch noWatermarks() to forMonotonousTimestamps()
// and values are being printed.
WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
.withTimestampAssigner((t, timestamp) -> t.f0)
).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
@Override
public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
int count = 0;
Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
while (iter.hasNext()) {
count++;
iter.next();
} out.collect("Key: " + key +
" count: " + count); }
}).print(); env.execute();
}
}{code}
The issue is that the stream makes use of _noWatermarks()_ which effectively
disables any event time windowing.
As this pipeline can never process values it is faulty and Flink should throw
an Exception when starting up.
--------------------
Proposed change:
We extend the interface
[WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55]
with the method _boolean isEventTime()_.
We create a new class named _EventTimeWindowPreconditions_ and add the
following method to it where we make use of _isEventTime()_:
{code:java}
public static void hasPrecedingEventTimeGenerator(final List<Transformation<?>>
predecessors) {
for (int i = predecessors.size() - 1; i >= 0; i--) {
final Transformation<?> pre = predecessors.get(i);
if (pre instanceof TimestampsAndWatermarksTransformation) {
TimestampsAndWatermarksTransformation<?>
timestampsAndWatermarksTransformation =
(TimestampsAndWatermarksTransformation<?>) pre;
final WatermarkStrategy<?> waStrat =
timestampsAndWatermarksTransformation.getWatermarkStrategy();
// assert that it generates timestamps or throw exception
if (!waStrat.isEventTime()) {
// TODO: Custom exception
throw new IllegalArgumentException(
"Cannot use an EventTime window with a preceding water
mark generator which"
+ " does not ingest event times. Did you use
noWatermarks() as the WatermarkStrategy"
+ " and used EventTime windows such as
SlidingEventTimeWindows/SlidingEventTimeWindows ?"
+ " These windows will never window any values
as your stream does not support event time"
);
}
// We have to terminate the check now as we have found the first
most recent
// timestamp assigner for this window and ensured that it actually
adds event
// time stamps. If there has been previously in the chain a window
assigner
// such as noWatermarks() we can safely ignore it as another valid
event time watermark assigner
// exists in the chain after and before our current event time
window.
break;
}
}
}
{code}
Then we can update the constructors of
[AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112]
and
[WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79]
to:
{code:java}
if (windowAssigner.isEventTime()) {
EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs());
}
{code}
This is the approach I currently have in mind but not sure whether this is the
best approach.
Best regards,
Dario
--
This message was sent by Atlassian Jira
(v8.3.4#803005)