Repository: tajo Updated Branches: refs/heads/branch-0.11.1 415e24831 -> 47e57a40c
TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47e57a40 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47e57a40 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47e57a40 Branch: refs/heads/branch-0.11.1 Commit: 47e57a40ced41e30606176de1d1ed5ab075dd180 Parents: 415e248 Author: Jinho Kim <[email protected]> Authored: Thu Dec 3 14:41:14 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Thu Dec 3 14:41:14 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/storage/RawFile.java | 5 ++- .../storage/rawfile/DirectRawFileScanner.java | 46 ++++++++------------ 3 files changed, 24 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/47e57a40/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 297799c..f2ce6f1 100644 --- a/CHANGES +++ b/CHANGES @@ -18,6 +18,8 @@ Release 0.11.1 - unreleased BUG FIXES + TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho) + TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho) TAJO-1980: Printout the usage of TajoShellCommand. http://git-wip-us.apache.org/repos/asf/tajo/blob/47e57a40/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 26bd135..f31b85c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -453,10 +453,11 @@ public class RawFile { return 1.0f; } - if (filePosition - startOffset == 0) { + long readBytes = filePosition - startOffset; + if (readBytes == 0) { return 0.0f; } else { - return Math.min(1.0f, ((float) filePosition / endOffset)); + return Math.min(1.0f, ((float) readBytes / fragment.getLength())); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47e57a40/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java index 550de63..1e2380e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java @@ -27,6 +27,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.*; @@ -50,7 +51,6 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner private SeekableInputChannel channel; private boolean eos = false; - private long fileSize; private long recordCount; private long filePosition; private long endOffset; @@ -95,10 +95,8 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner } channel = new LocalFileInputChannel(new FileInputStream(file)); - fileSize = channel.size(); } else { channel = new FSDataInputChannel(fs.open(fragment.getPath())); - fileSize = channel.size(); } // initial set position @@ -106,10 +104,6 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner channel.seek(fragment.getStartKey()); } - if (tableStats != null) { - tableStats.setNumBytes(fileSize); - } - filePosition = fragment.getStartKey(); endOffset = fragment.getStartKey() + fragment.getLength(); if (LOG.isDebugEnabled()) { @@ -178,7 +172,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner @Override public void close() throws IOException { if (tableStats != null) { - tableStats.setReadBytes(fileSize); + tableStats.setReadBytes(filePosition - fragment.getStartKey()); tableStats.setNumRows(recordCount); } if(tupleBuffer != null) { @@ -211,30 +205,28 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner } @Override + public TableStats getInputStats() { + if(tableStats != null){ + tableStats.setNumRows(recordCount); + tableStats.setReadBytes(filePosition - fragment.getStartKey()); // actual read bytes (scan + rescan * n) + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + + @Override public float getProgress() { if(!inited) return 0.0f; - try { - tableStats.setNumRows(recordCount); - long filePos = 0; - if (channel != null) { - filePos = channel.position(); - tableStats.setReadBytes(filePos); - } - - if(eos || channel == null) { - tableStats.setReadBytes(fileSize); - return 1.0f; - } + if(eos) { + return 1.0f; + } - if (filePos == 0) { - return 0.0f; - } else { - return Math.min(1.0f, ((float)filePos / (float)fileSize)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + long readBytes = filePosition - fragment.getStartKey(); + if (readBytes == 0) { return 0.0f; + } else { + return Math.min(1.0f, ((float) readBytes / fragment.getLength())); } } }
