[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for trafodion
With the change hive regressions pass except hive/TEST003 when cqd use_libhdfs_scan 'off' Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/a187b03b Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/a187b03b Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/a187b03b Branch: refs/heads/master Commit: a187b03bc37d62d00278d21eef519496ea6ce1aa Parents: 7066e3e Author: selvaganesang <[email protected]> Authored: Fri Feb 9 00:56:48 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Fri Feb 9 00:56:48 2018 +0000 ---------------------------------------------------------------------- core/sql/executor/ExHdfsScan.cpp | 2 ++ .../src/main/java/org/trafodion/sql/HDFSClient.java | 12 ++++++++++-- core/sql/src/main/java/org/trafodion/sql/HdfsScan.java | 13 ++++++++++--- 3 files changed, 22 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index f8ec9a1..cd95899 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -127,6 +127,8 @@ ExHdfsScanTcb::ExHdfsScanTcb( Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan(); + if (isSequenceFile()) + useLibhdfsScan_ = TRUE; lobGlob_ = NULL; hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_; headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_; http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 3b83c8f..5c8c487 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import java.nio.ByteBuffer; import java.io.IOException; +import java.io.EOFException; import java.io.OutputStream; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -95,8 +96,14 @@ public class HDFSClient { int bytesRead; int totalBytesRead = 0; - if (! buf_.hasArray()) - fsdis_.seek(pos_); + if (! buf_.hasArray()) { + try { + fsdis_.seek(pos_); + } catch (EOFException e) { + isEOF_ = 1; + return new Integer(totalBytesRead); + } + } do { if (buf_.hasArray()) @@ -157,6 +164,7 @@ public class HDFSClient int bytesRead; retObject = (Integer)future_.get(); bytesRead = retObject.intValue(); + fsdis_.close(); return bytesRead; } http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index f3d505d..e216555 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -188,9 +188,16 @@ public class HdfsScan if (logger_.isDebugEnabled()) logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]); lastBufCompleted_ = bufNo; - if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) { - scanCompleted_ = true; - return retArray; + if (isEOF == 1) { + if (currRange_ == (hdfsScanRanges_.length-1)) { + scanCompleted_ = true; + return retArray; + } else { + currRange_++; + currPos_ = hdfsScanRanges_[currRange_].pos_; + lenRemain_ = hdfsScanRanges_[currRange_].len_; + bytesRead = 0; + } } switch (lastBufCompleted_) {
