[ 
https://issues.apache.org/jira/browse/FLINK-15160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272685#comment-17272685
 ] 

Dawid Wysakowicz commented on FLINK-15160:
------------------------------------------

An idea for a fix is we could add a method to the {{TimerService}} for 
registering timer for timeout:
{code}
public interface TimerService {

    /**
     * Current processing time as returned from {@link 
org.apache.flink.streaming.api.TimerService}.
     */
    long currentProcessingTime();

   void registerTimeoutTimer(long timeoutTime);
}
{code}

and then in the {{NFA#computeNextState}} use this method in to register a timer 
for timeout when putting event for a start state:

{code}
                    if (isStartState(computationState)) {
                        startTimestamp = event.getTimestamp();
                        startEventId = event.getEventId();
                        // register timer for timeout in case no more events 
come for that key
                        timerService.registerTimeoutTimer(startTimestamp + 
windowTime);
                    } else {
                        startTimestamp = computationState.getStartTimestamp();
                        startEventId = computationState.getStartEventID();
                    }
{code}

> Clean up is not applied if there are no incoming events for a key.
> ------------------------------------------------------------------
>
>                 Key: FLINK-15160
>                 URL: https://issues.apache.org/jira/browse/FLINK-15160
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.8.2, 1.9.1, 1.10.0
>            Reporter: Dawid Wysakowicz
>            Priority: Major
>
> In CepOperator the pruning of timed out partial matches happens along with 
> feeding events into the NFA. Either when unbuffering on Watermark or 
> according to the processing time.
> 1. Processing time
> The state is pruned only with the timestamps of incoming events. If there are 
> no incoming events no pruning happens
> 2. Event time
> It is slightly more complicated, but the outcome is similar. We register 
> timers that pop events from the buffer, but we do not register any timers for 
> when the timeout of a partial match could happen. Therefore if there will be 
> no more events we will never prune matches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to