[ 
https://issues.apache.org/jira/browse/FLINK-4207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15391923#comment-15391923
 ] 

ASF GitHub Bot commented on FLINK-4207:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2277
  
    This doesn't work for some cases. Consider, when you change 
`testCleanupTimerWithEmptyListStateForTumblingWindows()` to this:
    
    ```
        @Test
        public void testCleanupTimerWithEmptyListStateForTumblingWindows() 
throws Exception {
                final int WINDOW_SIZE = 2;
                final long LATENESS = 100;
    
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
    
                ListStateDescriptor<Tuple2<String, Integer>> windowStateDesc =
                        new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
    
                WindowOperator<String, Tuple2<String, Integer>, 
Iterable<Tuple2<String, Integer>>, String, TimeWindow> operator =
                        new WindowOperator<>(
                                
TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
                                new TimeWindow.Serializer(),
                                new TupleKeySelector(),
                                
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                windowStateDesc,
                                new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
                                new EventTimeTriggerAccum(LATENESS),
                                LATENESS);
    
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
String> testHarness =
                        new OneInputStreamOperatorTestHarness<>(operator);
    
                testHarness.configureForKeyedStream(new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
    
                operator.setInputType(inputType, new ExecutionConfig());
                testHarness.open();
    
                ConcurrentLinkedQueue<Object> expected = new 
ConcurrentLinkedQueue<>();
    
                // normal element
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 1000));
                testHarness.processWatermark(new Watermark(1599));
                testHarness.processWatermark(new Watermark(1999));
                testHarness.processWatermark(new Watermark(2100));
                testHarness.processWatermark(new Watermark(5000));
    
                expected.add(new Watermark(1599));
                expected.add(new StreamRecord<>("GOT: (key2,1)", 1999));
                expected.add(new Watermark(1999)); // here it fires and purges
                expected.add(new Watermark(2100)); // here is the cleanup timer
                expected.add(new Watermark(5000));
    
                System.out.println("OUTPUT" + testHarness.getOutput());
    
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
                testHarness.close();
        }
    ```
    
    with `EventTimeTriggerAccum` like this:
    ```
        /**
         * A trigger that fires at the end of the window but does not
         * purge the state of the fired window. This is to test the state
         * garbage collection mechanism.
         */
        public class EventTimeTriggerAccum extends Trigger<Object, TimeWindow> {
                private static final long serialVersionUID = 1L;
    
                private long cleanupTime;
    
                private EventTimeTriggerAccum() {
                        cleanupTime = 0L;
                }
    
                public EventTimeTriggerAccum(long cleanupTime) {
                        this.cleanupTime = cleanupTime;
                }
    
                @Override
                public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) throws Exception {
                        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) 
{
                                // if the watermark is already past the window 
fire immediately
                                return TriggerResult.FIRE;
                        } else {
                                
ctx.registerEventTimeTimer(window.maxTimestamp());
                                return TriggerResult.CONTINUE;
                        }
                }
    
                @Override
                public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
                        return time == window.maxTimestamp() || time == 
window.maxTimestamp() + cleanupTime ?
                                TriggerResult.FIRE_AND_PURGE :
                                TriggerResult.CONTINUE;
                }
    
                @Override
                public TriggerResult onProcessingTime(long time, TimeWindow 
window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                }
    
                @Override
                public void clear(TimeWindow window, TriggerContext ctx) throws 
Exception {
                        ctx.deleteEventTimeTimer(window.maxTimestamp());
                }
    
                @Override
                public boolean canMerge() {
                        return true;
                }
    
                @Override
                public TriggerResult onMerge(TimeWindow window,
                                                                         
OnMergeContext ctx) {
                        ctx.registerEventTimeTimer(window.maxTimestamp());
                        return TriggerResult.CONTINUE;
                }
    
                @Override
                public String toString() {
                        return "EventTimeTrigger()";
                }
        }
    ```
    
    and `PassThroughFunction` like this:
    ```
        private class PassThroughFunction implements 
WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
                private static final long serialVersionUID = 1L;
    
                @Override
                public void apply(String k, TimeWindow window, 
Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws 
Exception {
                        out.collect("GOT: " + Joiner.on(",").join(input));
                }
        }
    ```
    
    Essentially, we want to trigger the window on cleanup time only when new 
data arrived after the purge. No new data arrives but the window function is 
called anyways.
    
    I'm afraid the solution to this really is to change `FoldingState` and 
`ListState` to return `null` when there is no state available. (Instead of 
"default value" and "empty iterable" as it is now). 


> WindowOperator becomes very slow with allowed lateness
> ------------------------------------------------------
>
>                 Key: FLINK-4207
>                 URL: https://issues.apache.org/jira/browse/FLINK-4207
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> In this simple example the throughput (as measured by the count the window 
> emits) becomes very low when an allowed lateness is set:
> {code}
> public class WindowWordCount {
>       public static void main(String[] args) throws Exception {
>               final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>               env.setParallelism(1);
>               env.addSource(new InfiniteTupleSource(100_000))
>                               .keyBy(0)
>                               .timeWindow(Time.seconds(3))
>                               .allowedLateness(Time.seconds(1))
>                               .reduce(new ReduceFunction<Tuple2<String, 
> Integer>>() {
>                                       @Override
>                                       public Tuple2<String, Integer> 
> reduce(Tuple2<String, Integer> value1,
>                                                       Tuple2<String, Integer> 
> value2) throws Exception {
>                                               return Tuple2.of(value1.f0, 
> value1.f1 + value2.f1);
>                                       }
>                               })
>                               .filter(new FilterFunction<Tuple2<String, 
> Integer>>() {
>                                       private static final long 
> serialVersionUID = 1L;
>                                       @Override
>                                       public boolean filter(Tuple2<String, 
> Integer> value) throws Exception {
>                                               return 
> value.f0.startsWith("Tuple 0");
>                                       }
>                               })
>                               .print();
>               // execute program
>               env.execute("WindowWordCount");
>       }
>       public static class InfiniteTupleSource implements 
> ParallelSourceFunction<Tuple2<String, Integer>> {
>               private static final long serialVersionUID = 1L;
>               private int numGroups;
>               public InfiniteTupleSource(int numGroups) {
>                       this.numGroups = numGroups;
>               }
>               @Override
>               public void run(SourceContext<Tuple2<String, Integer>> out) 
> throws Exception {
>                       long index = 0;
>                       while (true) {
>                               Tuple2<String, Integer> tuple = new 
> Tuple2<>("Tuple " + (index % numGroups), 1);
>                               out.collect(tuple);
>                               index++;
>                       }
>               }
>               @Override
>               public void cancel() {
>               }
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to