Repository: flink
Updated Branches:
  refs/heads/master ec6d97528 -> fc4abd7ff


[FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4abd7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4abd7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4abd7f

Branch: refs/heads/master
Commit: fc4abd7fff5fa9bbfbd2196e61bf696a1dd57ad7
Parents: ec6d975
Author: kl0u <[email protected]>
Authored: Thu Jun 16 18:18:28 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Jun 17 13:32:02 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      | 23 +++++++++++---------
 .../flink/api/common/io/BinaryInputFormat.java  | 17 +++++++++------
 .../api/common/io/DelimitedInputFormat.java     | 12 +++++++---
 .../source/ContinuousFileReaderOperator.java    | 16 +++++++++++---
 4 files changed, 45 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index a920275..73067c1 100644
--- 
a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ 
b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> 
implements ResultType
 
        private transient long recordsReadSinceLastSync;
 
-       private transient long lastSync = -1l;
+       private long lastSync = -1l;
 
        public AvroInputFormat(Path filePath, Class<E> type) {
                super(filePath);
@@ -186,18 +186,21 @@ public class AvroInputFormat<E> extends 
FileInputFormat<E> implements ResultType
                Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
                Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
 
-               this.open(split);
-               if (state.f0 != -1) {
+               try {
+                       this.open(split);
+               } finally {
+                       if (state.f0 != -1) {
+                               lastSync = state.f0;
+                               recordsReadSinceLastSync = state.f1;
+                       }
+               }
 
-                       // go to the block we stopped
-                       lastSync = state.f0;
+               if (lastSync != -1) {
+                       // open and read until the record we were before
+                       // the checkpoint and discard the values
                        dataFileReader.seek(lastSync);
-
-                       // read until the record we were before the checkpoint 
and discard the values
-                       long recordsToDiscard = state.f1;
-                       for(int i = 0; i < recordsToDiscard; i++) {
+                       for(int i = 0; i < recordsReadSinceLastSync; i++) {
                                dataFileReader.next(null);
-                               recordsReadSinceLastSync++;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index eb83bda..96e0e0d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -390,14 +390,17 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T>
                Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
                Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
 
-               this.open(split);
-               this.blockInfo = this.createAndReadBlockInfo();
+               try {
+                       this.open(split);
+               } finally {
+                       this.blockInfo = this.createAndReadBlockInfo();
 
-               long blockPos = state.f0;
-               this.readRecords = state.f1;
+                       long blockPos = state.f0;
+                       this.readRecords = state.f1;
 
-               this.stream.seek(this.splitStart + blockPos);
-               this.blockBasedInput = new BlockBasedInput(this.stream, (int) 
blockPos, this.splitLength);
-               this.dataInputStream = new 
DataInputViewStreamWrapper(blockBasedInput);
+                       this.stream.seek(this.splitStart + blockPos);
+                       this.blockBasedInput = new BlockBasedInput(this.stream, 
(int) blockPos, this.splitLength);
+                       this.dataInputStream = new 
DataInputViewStreamWrapper(blockBasedInput);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 3a77200..4cd200d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
 
        private transient boolean end;
 
-       private transient long offset = -1;
+       private long offset = -1;
 
        // 
--------------------------------------------------------------------------------------------
        //  The configuration parameters. Configured on the instance and 
serialized to be shipped.
@@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
        public void reopen(FileInputSplit split, Long state) throws IOException 
{
                Preconditions.checkNotNull(split, "reopen() cannot be called on 
a null split.");
                Preconditions.checkNotNull(state, "reopen() cannot be called 
with a null initial state.");
+               Preconditions.checkArgument(state == -1 || state >= 
split.getStart(),
+                       " Illegal offset "+ state +", smaller than the splits 
start=" + split.getStart());
+
+               try {
+                       this.open(split);
+               } finally {
+                       this.offset = state;
+               }
 
-               this.open(split);
-               this.offset = state;
                if (state > this.splitStart + split.getLength()) {
                        this.end = true;
                } else if (state > split.getStart()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index e26c534..9319338 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                private S restoredFormatState = null;
 
+               private volatile boolean isSplitOpen = false;
+
                SplitReader(FileInputFormat<OT> format,
                                        TypeSerializer<OT> serializer,
                                        TimestampedCollector<OT> collector,
@@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        }
                                                        
this.format.open(currentSplit);
                                                }
+                                               this.isSplitOpen = true;
                                        }
 
                                        LOG.info("Reading split: " + 
currentSplit);
@@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                                        } finally {
                                                // close and prepare for the 
next iteration
-                                               this.format.close();
-                                               this.currentSplit = null;
+                                               synchronized (checkpointLock) {
+                                                       this.format.close();
+                                                       this.isSplitOpen = 
false;
+                                                       this.currentSplit = 
null;
+                                               }
                                        }
                                }
 
@@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        } finally {
                                synchronized (checkpointLock) {
                                        LOG.info("Reader terminated, and 
exiting...");
+
                                        this.format.closeInputFormat();
+                                       this.isSplitOpen = false;
+                                       this.currentSplit = null;
                                        this.isRunning = false;
+
                                        checkpointLock.notifyAll();
                                }
                        }
@@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                this.pendingSplits.remove();
                        }
 
-                       if (this.format instanceof CheckpointableInputFormat) {
+                       if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
                                S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
                                return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
                        } else {

Reply via email to