[
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391923#comment-15391923
]
ASF GitHub Bot commented on FLINK-4207:
---------------------------------------
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2277
This doesn't work for some cases. Consider, when you change
`testCleanupTimerWithEmptyListStateForTumblingWindows()` to this:
```
@Test
public void testCleanupTimerWithEmptyListStateForTumblingWindows()
throws Exception {
final int WINDOW_SIZE = 2;
final long LATENESS = 100;
TypeInformation<Tuple2<String, Integer>> inputType =
TypeInfoParser.parse("Tuple2<String, Integer>");
ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
new ListStateDescriptor<>("window-contents",
inputType.createSerializer(new ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>,
Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator =
new WindowOperator<>(
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
windowStateDesc,
new InternalIterableWindowFunction<>(new
PassThroughFunction()),
new EventTimeTriggerAccum(LATENESS),
LATENESS);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
String> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new
ConcurrentLinkedQueue<>();
// normal element
testHarness.processElement(new StreamRecord<>(new
Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1599));
testHarness.processWatermark(new Watermark(1999));
testHarness.processWatermark(new Watermark(2100));
testHarness.processWatermark(new Watermark(5000));
expected.add(new Watermark(1599));
expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
expected.add(new Watermark(1999)); // here it fires and purges
expected.add(new Watermark(2100)); // here is the cleanup timer
expected.add(new Watermark(5000));
System.out.println("OUTPUT" + testHarness.getOutput());
TestHarnessUtil.assertOutputEqualsSorted("Output was not
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
testHarness.close();
}
```
with `EventTimeTriggerAccum` like this:
```
/**
* A trigger that fires at the end of the window but does not
* purge the state of the fired window. This is to test the state
* garbage collection mechanism.
*/
public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private long cleanupTime;
private EventTimeTriggerAccum() {
cleanupTime = 0L;
}
public EventTimeTriggerAccum(long cleanupTime) {
this.cleanupTime = cleanupTime;
}
@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark())
{
// if the watermark is already past the window
fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() || time ==
window.maxTimestamp() + cleanupTime ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow
window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws
Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public TriggerResult onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
}
```
and `PassThroughFunction` like this:
```
private class PassThroughFunction implements
WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
private static final long serialVersionUID = 1L;
@Override
public void apply(String k, TimeWindow window,
Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws
Exception {
out.collect("GOT: " + Joiner.on(",").join(input));
}
}
```
Essentially, we want to trigger the window on cleanup time only when new
data arrived after the purge. No new data arrives but the window function is
called anyways.
I'm afraid the solution to this really is to change `FoldingState` and
`ListState` to return `null` when there is no state available. (Instead of
"default value" and "empty iterable" as it is now).
> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
> Key: FLINK-4207
> URL: https://issues.apache.org/jira/browse/FLINK-4207
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Kostas Kloudas
> Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
> env.addSource(new InfiniteTupleSource(100_000))
> .keyBy(0)
> .timeWindow(Time.seconds(3))
> .allowedLateness(Time.seconds(1))
> .reduce(new ReduceFunction<Tuple2<String,
> Integer>>() {
> @Override
> public Tuple2<String, Integer>
> reduce(Tuple2<String, Integer> value1,
> Tuple2<String, Integer>
> value2) throws Exception {
> return Tuple2.of(value1.f0,
> value1.f1 + value2.f1);
> }
> })
> .filter(new FilterFunction<Tuple2<String,
> Integer>>() {
> private static final long
> serialVersionUID = 1L;
> @Override
> public boolean filter(Tuple2<String,
> Integer> value) throws Exception {
> return
> value.f0.startsWith("Tuple 0");
> }
> })
> .print();
> // execute program
> env.execute("WindowWordCount");
> }
> public static class InfiniteTupleSource implements
> ParallelSourceFunction<Tuple2<String, Integer>> {
> private static final long serialVersionUID = 1L;
> private int numGroups;
> public InfiniteTupleSource(int numGroups) {
> this.numGroups = numGroups;
> }
> @Override
> public void run(SourceContext<Tuple2<String, Integer>> out)
> throws Exception {
> long index = 0;
> while (true) {
> Tuple2<String, Integer> tuple = new
> Tuple2<>("Tuple " + (index % numGroups), 1);
> out.collect(tuple);
> index++;
> }
> }
> @Override
> public void cancel() {
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)