[ 
https://issues.apache.org/jira/browse/FLINK-3717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15262302#comment-15262302
 ] 

ASF GitHub Bot commented on FLINK-3717:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1895#discussion_r61445422
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
    @@ -427,18 +434,37 @@ public void open(FileInputSplit split) throws 
IOException {
                this.overLimit = false;
                this.end = false;
     
    -           if (this.splitStart != 0) {
    +           if (this.splitStart != 0 && this.restoredOffset == null) {
                        this.stream.seek(this.splitStart);
    +                   this.offset = this.splitStart;
                        readLine();
    -                   
    +
                        // if the first partial record already pushes the 
stream over the limit of our split, then no
    -                   // record starts within this split 
    +                   // record starts within this split
                        if (this.overLimit) {
                                this.end = true;
                        }
    +           } else if (this.restoredOffset != null) {
    +
    +                   if (!this.restoredSplit.equals(split)) {
    +                           throw new RuntimeException("Tried to open at 
the wrong split after recovery.");
    +                   }
    +
    +                   // this is the case where we restart from a specific 
offset within a split (e.g. after a node failure)
    +                   this.stream.seek(this.restoredOffset);
    +                   this.currSplit = this.restoredSplit;
    +                   this.offset = this.restoredOffset;
    +                   this.splitLength = this.splitStart + this.splitLength - 
this.offset;
    --- End diff --
    
    `splitLength` is used to keep track of how many bytes there are left to 
read, correct? The name is not very good in that case, which is not your fault, 
since it was there before.


> Add functionality to be a able to restore from specific point in a 
> FileInputFormat
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-3717
>                 URL: https://issues.apache.org/jira/browse/FLINK-3717
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>
> This is the first step in order to make the File Sources fault-tolerant. We 
> have to be able to get the start from a specific point in a file despite any 
> caching performed during reading. This will guarantee that the task that will 
> take over the execution of the failed one will be able to start from the 
> correct point in the file.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to