[
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260171#comment-15260171
]
Aljoscha Krettek commented on FLINK-3669:
-----------------------------------------
I think we have a winner now :-) Could you also test whether it solves your
problem when running your job?
It's also cool that you added a test for this. I changed it a bit to go through
the complete checkpointing and operator lifecycle:
{code}
@Test
public void testRestoreAndSnapshotAreInSync() throws Exception {
final int WINDOW_SIZE = 3;
final int WINDOW_SLIDE = 1;
TypeInformation<Tuple2<String, Integer>> inputType =
TypeInfoParser.parse("Tuple2<String, Integer>");
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc =
new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
inputType.createSerializer(new
ExecutionConfig()));
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE,
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> testHarness =
new
OneInputStreamOperatorTestHarness<>(operator);
testHarness.configureForKeyedStream(new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
operator.setInputType(inputType, new ExecutionConfig());
testHarness.open();
WindowOperator.Timer<String, TimeWindow> timer1 = new
WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
WindowOperator.Timer<String, TimeWindow> timer2 = new
WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
WindowOperator.Timer<String, TimeWindow> timer3 = new
WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
operator.processingTimeTimers.add(timer1);
operator.processingTimeTimers.add(timer2);
operator.processingTimeTimers.add(timer3);
operator.processingTimeTimersQueue.add(timer1);
operator.processingTimeTimersQueue.add(timer2);
operator.processingTimeTimersQueue.add(timer3);
operator.processingTimeTimerTimestamps.add(1L, 10);
operator.processingTimeTimerTimestamps.add(2L, 5);
operator.processingTimeTimerTimestamps.add(3L, 1);
StreamTaskState snapshot = testHarness.snapshot(0, 0);
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new
WindowOperator<>(
SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE,
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TimeWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create());
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>,
Tuple2<String, Integer>> otherTestHarness =
new
OneInputStreamOperatorTestHarness<>(otherOperator);
otherTestHarness.configureForKeyedStream(new
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
otherOperator.setInputType(inputType, new ExecutionConfig());
otherTestHarness.setup();
otherTestHarness.restore(snapshot, 0);
otherTestHarness.open();
Assert.assertEquals(operator.processingTimeTimers,
otherOperator.processingTimeTimers);
Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(),
otherOperator.processingTimeTimersQueue.toArray());
Assert.assertEquals(operator.processingTimeTimerTimestamps,
otherOperator.processingTimeTimerTimestamps);
}
{code}
The test was good already but this is just something I would know since I wrote
the test harnesses and the other operator tests. :-) This way, we also don't
need the special {{restoreStateFrom}} and {{snapshotStateTo}} methods.
Will you open a PR?
> WindowOperator registers a lot of timers at StreamTask
> ------------------------------------------------------
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.0.1
> Reporter: Aljoscha Krettek
> Assignee: Konstantin Knauf
> Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every
> processing-time timer that a Trigger registers. We should combine several
> registered trigger timers to only register one low-level timer (timer
> coalescing).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)