[
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14654674#comment-14654674
]
Sheetal Parade commented on FLINK-2314:
---------------------------------------
Thanks Stephan,
snapshot number of records/elements read from file source.
This is possible by adding a counter for records read in the run method of
org.apache.flink.streaming.api.functions.source.FileSourceFunction
{code}
synchronized (ctx.getCheckpointLock()){
ctx.collect(nextElement);
currSplit++;
}
{code}
During recovery, the file source has to skip records read earlier. This can be
done in two places, in the *run* method or in *open* method.
The run method maintains the code is single place and maintains currents checks
for nulls.
{code}
if(currSplit < splitCount) {
currSplit ++;
continue;
}
{code}
where currSplit is count of records read.
and splitCount is snapshot positions provides.
> Make Streaming File Sources Persistent
> --------------------------------------
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 0.9
> Reporter: Stephan Ewen
> Assignee: Sheetal Parade
> Labels: easyfix, starter
>
> Streaming File sources should participate in the checkpointing. They should
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a
> checkpointed source.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)