Repository: tez Updated Branches: refs/heads/branch-0.6 8fbd0ba84 -> 7fe2dc592
TEZ-2348. EOF exception during UnorderedKVReader.next() (rbalamohan) (cherry picked from commit ca83804f9814532a9714aab58e7787578833ef2a) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7fe2dc59 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7fe2dc59 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7fe2dc59 Branch: refs/heads/branch-0.6 Commit: 7fe2dc59253f2f362db899c51f29a1a3f9568173 Parents: 8fbd0ba Author: Rajesh Balamohan <[email protected]> Authored: Thu Apr 30 05:04:18 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Apr 30 05:05:23 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/readers/UnorderedKVReader.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7fe2dc59/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 129f0f7..af7612e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -184,6 +184,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2348. EOF exception during UnorderedKVReader.next(). TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in recovery. TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job output-path TEZ-2303. ConcurrentModificationException while processing recovery. http://git-wip-us.apache.org/repos/asf/tez/blob/7fe2dc59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java index c2947c2..5955f2f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java @@ -70,6 +70,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { // TODO Remove this once per I/O counters are separated properly. Relying on // the counter at the moment will generate aggregate numbers. private int numRecordsRead = 0; + + private boolean completedProcessing; public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, int ifileBufferSize, @@ -124,10 +126,19 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { nextInputExists = moveToNextInput(); } LOG.info("Num Records read: " + numRecordsRead); + completedProcessing = true; return false; } } + private void hasCompletedProcessing() throws IOException { + if (completedProcessing) { + throw new IOException("Reader has already processed all the inputs. Please check if you are" + + " invoking next() even after it returned false. For usage, please refer to " + + "KeyValueReader javadocs"); + } + } + @Override public Object getCurrentKey() throws IOException { return (Object) key; @@ -170,6 +181,11 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { private boolean moveToNextInput() throws IOException { if (currentReader != null) { // Close the current reader. currentReader.close(); + /** + * clear reader explicitly. Otherwise this could point to stale reference when next() is + * called and end up throwing EOF exception from IFIle. Ref: TEZ-2348 + */ + currentReader = null; currentFetchedInput.free(); } try { @@ -179,6 +195,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader { throw new IOException(e); } if (currentFetchedInput == null) { + hasCompletedProcessing(); return false; // No more inputs } else { currentReader = openIFileReader(currentFetchedInput);
