MLHR-1721: Convert NullPointerException to IOException during seek to avoid operator failure.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/bd9cd5c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/bd9cd5c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/bd9cd5c9 Branch: refs/heads/devel-3 Commit: bd9cd5c932e44c401b8a334e0d4e7736c2980ea7 Parents: 818c683 Author: Tushar Gosavi <[email protected]> Authored: Thu Apr 30 11:11:41 2015 +0530 Committer: Chandni Singh <[email protected]> Committed: Mon Nov 23 21:37:02 2015 -0800 ---------------------------------------------------------------------- tfile/TFileReader.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/bd9cd5c9/tfile/TFileReader.java ---------------------------------------------------------------------- diff --git a/tfile/TFileReader.java b/tfile/TFileReader.java index 972b4f9..2f06da9 100644 --- a/tfile/TFileReader.java +++ b/tfile/TFileReader.java @@ -18,6 +18,7 @@ package com.datatorrent.contrib.hdht.tfile; import java.io.IOException; import java.util.TreeMap; +import com.datatorrent.common.util.DTThrowable; import com.datatorrent.common.util.Slice; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,7 @@ public class TFileReader implements HDSFileReader private final Reader reader; private final Scanner scanner; private final FSDataInputStream fsdis; + private boolean closed = false; public TFileReader(FSDataInputStream fsdis, long fileLength, Configuration conf) throws IOException { @@ -54,6 +56,7 @@ public class TFileReader implements HDSFileReader @Override public void close() throws IOException { + closed = true; scanner.close(); reader.close(); fsdis.close(); @@ -85,7 +88,14 @@ public class TFileReader implements HDSFileReader @Override public boolean seek(Slice key) throws IOException { - return scanner.seekTo(key.buffer, key.offset, key.length); + try { + return scanner.seekTo(key.buffer, key.offset, key.length); + } catch (NullPointerException ex) { + if (closed) + throw new IOException("Stream was closed"); + else + throw ex; + } } @Override
