[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15260171#comment-15260171
 ] 

Aljoscha Krettek commented on FLINK-3669:
-----------------------------------------

I think we have a winner now :-) Could you also test whether it solves your 
problem when running your job?

It's also cool that you added a test for this. I changed it a bit to go through 
the complete checkpointing and operator lifecycle:
{code}
@Test
        public void testRestoreAndSnapshotAreInSync() throws Exception {

                final int WINDOW_SIZE = 3;
                final int WINDOW_SLIDE = 1;

                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");

                ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
                                new SumReducer(),
                                inputType.createSerializer(new 
ExecutionConfig()));

                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
                                SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create());


                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
OneInputStreamOperatorTestHarness<>(operator);

                testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);

                operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();

                WindowOperator.Timer<String, TimeWindow> timer1 = new 
WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L));
                WindowOperator.Timer<String, TimeWindow> timer2 = new 
WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L));
                WindowOperator.Timer<String, TimeWindow> timer3 = new 
WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L));
                operator.processingTimeTimers.add(timer1);
                operator.processingTimeTimers.add(timer2);
                operator.processingTimeTimers.add(timer3);
                operator.processingTimeTimersQueue.add(timer1);
                operator.processingTimeTimersQueue.add(timer2);
                operator.processingTimeTimersQueue.add(timer3);

                operator.processingTimeTimerTimestamps.add(1L, 10);
                operator.processingTimeTimerTimestamps.add(2L, 5);
                operator.processingTimeTimerTimestamps.add(3L, 1);


                StreamTaskState snapshot = testHarness.snapshot(0, 0);

                WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new 
WindowOperator<>(
                                SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, 
TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                stateDesc,
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                EventTimeTrigger.create());

                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> otherTestHarness =
                                new 
OneInputStreamOperatorTestHarness<>(otherOperator);

                otherTestHarness.configureForKeyedStream(new 
TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
                otherOperator.setInputType(inputType, new ExecutionConfig());

                otherTestHarness.setup();
                otherTestHarness.restore(snapshot, 0);
                otherTestHarness.open();

                Assert.assertEquals(operator.processingTimeTimers, 
otherOperator.processingTimeTimers);
                
Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), 
otherOperator.processingTimeTimersQueue.toArray());
                Assert.assertEquals(operator.processingTimeTimerTimestamps, 
otherOperator.processingTimeTimerTimestamps);
        }
{code}

The test was good already but this is just something I would know since I wrote 
the test harnesses and the other operator tests. :-) This way, we also don't 
need the special {{restoreStateFrom}} and {{snapshotStateTo}} methods.

Will you open a PR?

> WindowOperator registers a lot of timers at StreamTask
> ------------------------------------------------------
>
>                 Key: FLINK-3669
>                 URL: https://issues.apache.org/jira/browse/FLINK-3669
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.0.1
>            Reporter: Aljoscha Krettek
>            Assignee: Konstantin Knauf
>            Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to