Github user rtudoran commented on the issue:

    https://github.com/apache/flink/pull/4380
  
    @fhueske
    Also - i do not understand why you would need to keep the counters for 
offset/fetch as states?
    
    Assuming we have in the buffer state with events for proctime T values (1, 
2, 3, 4, 5)
    You want to emit them with offset 2 and fetch 2 (hence values 3 and 4)
    
    So you will have the onTimer function when proctime moved and you can 
trigger computation for time T (i.e. at T+1)
    
    The basic logic after you sort is that you go through the 5 elements and 
count the offset and then the fetch
    
    for(int i=0; i< inputs.size; i++) {
    offsetCounter++;
    if(offsetCounter > offset && fetchCounter<fetch) {
       out.collect(inputs(i))
       fetchCounter++;
    }
    }
    
    ...you would then update the states at the end
    What is the point here to memorize the fetchCounter and offsetCounter?
    If a failure happen meanwhile you would anyway restore the whole list of 
the 5 elements and restart the logic (i.e., from the beginning of the function).
    It is not like you do a state update at every iteration to pick in case of 
a failure from let's say line 5 when the value of counter was at a certain value


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to