Repository: flink Updated Branches: refs/heads/master ec6d97528 -> fc4abd7ff
[FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4abd7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4abd7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4abd7f Branch: refs/heads/master Commit: fc4abd7fff5fa9bbfbd2196e61bf696a1dd57ad7 Parents: ec6d975 Author: kl0u <[email protected]> Authored: Thu Jun 16 18:18:28 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Jun 17 13:32:02 2016 +0200 ---------------------------------------------------------------------- .../flink/api/java/io/AvroInputFormat.java | 23 +++++++++++--------- .../flink/api/common/io/BinaryInputFormat.java | 17 +++++++++------ .../api/common/io/DelimitedInputFormat.java | 12 +++++++--- .../source/ContinuousFileReaderOperator.java | 16 +++++++++++--- 4 files changed, 45 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java index a920275..73067c1 100644 --- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java +++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java @@ -68,7 +68,7 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType private transient long recordsReadSinceLastSync; - private transient long lastSync = -1l; + private long lastSync = -1l; public AvroInputFormat(Path filePath, Class<E> type) { super(filePath); @@ -186,18 +186,21 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - this.open(split); - if (state.f0 != -1) { + try { + this.open(split); + } finally { + if (state.f0 != -1) { + lastSync = state.f0; + recordsReadSinceLastSync = state.f1; + } + } - // go to the block we stopped - lastSync = state.f0; + if (lastSync != -1) { + // open and read until the record we were before + // the checkpoint and discard the values dataFileReader.seek(lastSync); - - // read until the record we were before the checkpoint and discard the values - long recordsToDiscard = state.f1; - for(int i = 0; i < recordsToDiscard; i++) { + for(int i = 0; i < recordsReadSinceLastSync; i++) { dataFileReader.next(null); - recordsReadSinceLastSync++; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index eb83bda..96e0e0d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -390,14 +390,17 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); - this.open(split); - this.blockInfo = this.createAndReadBlockInfo(); + try { + this.open(split); + } finally { + this.blockInfo = this.createAndReadBlockInfo(); - long blockPos = state.f0; - this.readRecords = state.f1; + long blockPos = state.f0; + this.readRecords = state.f1; - this.stream.seek(this.splitStart + blockPos); - this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength); - this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput); + this.stream.seek(this.splitStart + blockPos); + this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength); + this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 3a77200..4cd200d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -144,7 +144,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple private transient boolean end; - private transient long offset = -1; + private long offset = -1; // -------------------------------------------------------------------------------------------- // The configuration parameters. Configured on the instance and serialized to be shipped. @@ -638,9 +638,15 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); + Preconditions.checkArgument(state == -1 || state >= split.getStart(), + " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + + try { + this.open(split); + } finally { + this.offset = state; + } - this.open(split); - this.offset = state; if (state > this.splitStart + split.getLength()) { this.end = true; } else if (state > split.getStart()) { http://git-wip-us.apache.org/repos/asf/flink/blob/fc4abd7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index e26c534..9319338 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -188,6 +188,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private S restoredFormatState = null; + private volatile boolean isSplitOpen = false; + SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, TimestampedCollector<OT> collector, @@ -271,6 +273,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } this.format.open(currentSplit); } + this.isSplitOpen = true; } LOG.info("Reading split: " + currentSplit); @@ -290,8 +293,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } finally { // close and prepare for the next iteration - this.format.close(); - this.currentSplit = null; + synchronized (checkpointLock) { + this.format.close(); + this.isSplitOpen = false; + this.currentSplit = null; + } } } @@ -303,8 +309,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } finally { synchronized (checkpointLock) { LOG.info("Reader terminated, and exiting..."); + this.format.closeInputFormat(); + this.isSplitOpen = false; + this.currentSplit = null; this.isRunning = false; + checkpointLock.notifyAll(); } } @@ -321,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A this.pendingSplits.remove(); } - if (this.format instanceof CheckpointableInputFormat) { + if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState(); return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState); } else {
