[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14654674#comment-14654674
 ] 

Sheetal Parade commented on FLINK-2314:
---------------------------------------

Thanks Stephan,

snapshot number of records/elements read from file source.

This is possible by adding a counter for records read in the run method of
org.apache.flink.streaming.api.functions.source.FileSourceFunction

{code}
synchronized (ctx.getCheckpointLock()){
  ctx.collect(nextElement);
  currSplit++;
}
{code}

During recovery, the file source has to skip records read earlier. This can be 
done in two places, in the *run* method or in *open* method.

The run method maintains the code is single place and maintains currents checks 
for nulls.

{code}
if(currSplit < splitCount) {
  currSplit ++;
  continue;
}
{code}

where currSplit is count of records read.
and splitCount is snapshot positions provides.


> Make Streaming File Sources Persistent
> --------------------------------------
>
>                 Key: FLINK-2314
>                 URL: https://issues.apache.org/jira/browse/FLINK-2314
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Sheetal Parade
>              Labels: easyfix, starter
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to