Repository: tez
Updated Branches:
  refs/heads/branch-0.6 8fbd0ba84 -> 7fe2dc592


TEZ-2348. EOF exception during UnorderedKVReader.next() (rbalamohan)

(cherry picked from commit ca83804f9814532a9714aab58e7787578833ef2a)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7fe2dc59
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7fe2dc59
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7fe2dc59

Branch: refs/heads/branch-0.6
Commit: 7fe2dc59253f2f362db899c51f29a1a3f9568173
Parents: 8fbd0ba
Author: Rajesh Balamohan <[email protected]>
Authored: Thu Apr 30 05:04:18 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Thu Apr 30 05:05:23 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                        |  1 +
 .../library/common/readers/UnorderedKVReader.java  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7fe2dc59/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 129f0f7..af7612e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -184,6 +184,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2348. EOF exception during UnorderedKVReader.next().
   TEZ-1560. Invalid state machine handling for V_SOURCE_VERTEX_RECOVERED in 
recovery.
   TEZ-2305. MR compatibility sleep job fails with IOException: Undefined job 
output-path
   TEZ-2303. ConcurrentModificationException while processing recovery.

http://git-wip-us.apache.org/repos/asf/tez/blob/7fe2dc59/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index c2947c2..5955f2f 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -70,6 +70,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // TODO Remove this once per I/O counters are separated properly. Relying on
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
+
+  private boolean completedProcessing;
   
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
       CompressionCodec codec, boolean ifileReadAhead, int 
ifileReadAheadLength, int ifileBufferSize,
@@ -124,10 +126,19 @@ public class UnorderedKVReader<K, V> extends 
KeyValueReader {
         nextInputExists = moveToNextInput();
       }
       LOG.info("Num Records read: " + numRecordsRead);
+      completedProcessing = true;
       return false;
     }
   }
 
+  private void hasCompletedProcessing() throws IOException {
+    if (completedProcessing) {
+      throw new IOException("Reader has already processed all the inputs. 
Please check if you are"
+          + " invoking next() even after it returned false. For usage, please 
refer to "
+          + "KeyValueReader javadocs");
+    }
+  }
+
   @Override
   public Object getCurrentKey() throws IOException {
     return (Object) key;
@@ -170,6 +181,11 @@ public class UnorderedKVReader<K, V> extends 
KeyValueReader {
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
       currentReader.close();
+      /**
+       * clear reader explicitly. Otherwise this could point to stale 
reference when next() is
+       * called and end up throwing EOF exception from IFIle. Ref: TEZ-2348
+       */
+      currentReader = null;
       currentFetchedInput.free();
     }
     try {
@@ -179,6 +195,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader 
{
       throw new IOException(e);
     }
     if (currentFetchedInput == null) {
+      hasCompletedProcessing();
       return false; // No more inputs
     } else {
       currentReader = openIFileReader(currentFetchedInput);

Reply via email to