[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables
Fix for seabase/TEST031 failure with USE_LIBHDFS_SCAN turned 'OFF' Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/37ab3c03 Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/37ab3c03 Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/37ab3c03 Branch: refs/heads/master Commit: 37ab3c0331db8d8cd1ab026e1833bd492cea0e76 Parents: 060bfc6 Author: selvaganesang <[email protected]> Authored: Wed Apr 18 20:03:24 2018 +0000 Committer: selvaganesang <[email protected]> Committed: Wed Apr 18 20:03:24 2018 +0000 ---------------------------------------------------------------------- core/sql/executor/ExHdfsScan.cpp | 55 +++++++++++++++++++++-------------- core/sql/executor/ExHdfsScan.h | 2 +- 2 files changed, 34 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/37ab3c03/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index d09f6cd..dcf0d07 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -124,8 +124,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( , hdfsScan_(NULL) , hdfsStats_(NULL) , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries()) - , errBuf_(NULL) - + , numFiles_(0) { Space * space = (glob ? glob->getSpace() : 0); CollHeap * heap = (glob ? glob->getDefaultHeap() : 0); @@ -320,8 +319,6 @@ void ExHdfsScanTcb::freeResources() NADELETE(logFileHdfsClient_, HdfsClient, getHeap()); if (hdfsScan_ != NULL) NADELETE(hdfsScan_, HdfsScan, getHeap()); - if (errBuf_ != NULL) - NADELETEBASIC(errBuf_, getHeap()); } NABoolean ExHdfsScanTcb::needStatsEntry() @@ -628,14 +625,23 @@ ExWorkProcRetcode ExHdfsScanTcb::work() else extraBytesRead_ = 0; // headRoom_ is the number of extra bytes to be read (rangeTailIOSize) - // If EOF is reached while reading the range and the extraBytes read - // is less than headRoom_ then process all the data till EOF - // TODO: If the whole range fits in one buffer, it is need too to process rows till EOF for the last range alone - // No easy way to identify that last range read, but can identify that it is not the first range. - // The rows could be read more than once if there are more than 2 ranges. - // Fix optimizer not to have more than 2 ranges in that case - if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && hdfo->getStartOffset() != 0) - extraBytesRead_ = 0; + // If the whole range fits in one buffer, it is needed to process rows till EOF for the last range alone. +/* + if (retArray_[IS_EOF] && (extraBytesRead_ < headRoom_) + && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) + extraBytesRead_ = 0; +*/ + if (numFiles_ <= 1) { + if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1))) + extraBytesRead_ = 0; + } + else { + // If EOF is reached while reading the range and the extraBytes read + // is less than headRoom_ then process all the data till EOF + if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ ) + extraBytesRead_ = 0; + } + bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_; prevRangeNum_ = retArray_[RANGE_NO]; headRoomCopied_ = 0; @@ -656,6 +662,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work() } else hdfsBufNextRow_ = (char *)bufBegin_; + QRLogger::log(CAT_SQL_EXE, LL_DEBUG, "FileName %s Offset %ld BytesToRead %ld BytesRead %ld RangeNo %d IsEOF %d BufBegin: 0x%lx BufEnd: 0x%lx BufLogicalEnd: 0x%lx headRoom %d extraBytes %d recordSkip %d ", + hdfo->fileName(), hdfo->getStartOffset(), hdfo->bytesToRead_, retArray_[BYTES_COMPLETED], retArray_[RANGE_NO], retArray_[IS_EOF], bufBegin_ , bufEnd_, bufLogicalEnd_, headRoom_, extraBytesRead_, recordSkip_); + // If the first record starts after the logical end, this record should have been processed by other ESPs + if ((BYTE *)hdfsBufNextRow_ > bufLogicalEnd_) { + headRoomCopied_ = 0; + hdfsBufNextRow_ = NULL; + } step_ = PROCESS_HDFS_ROW; } break; @@ -784,16 +797,14 @@ ExWorkProcRetcode ExHdfsScanTcb::work() ComDiagsArea * diagsArea = NULL; if (hdfsErrorDetail == ENOENT) { - if (errBuf_ != NULL) - NADELETEBASIC(errBuf_, getHeap()); Lng32 len = strlen(hdfsScanTdb().tableName()) + strlen(hdfsFileName_) + 100; - errBuf_ = new (getHeap()) char[len]; - snprintf(errBuf_, len, "%s (fileLoc: %s)", + char errBuf[len]; + snprintf(errBuf, len, "%s (fileLoc: %s)", hdfsScanTdb().tableName(), hdfsFileName_); ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(EXE_TABLE_NOT_FOUND), NULL, NULL, NULL, NULL, - errBuf_); + errBuf); } else ExRaiseSqlError(getHeap(), &diagsArea, @@ -1091,7 +1102,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work() int err = 0; char *startOfNextRow = extractAndTransformAsciiSourceToSqlRow(err, transformDiags, hdfsScanTdb().getHiveScanMode()); - + QRLogger::log(CAT_SQL_EXE, LL_DEBUG, "HdfsBufRow 0x%lx StartOfNextRow 0x%lx RowLength %ld ", hdfsBufNextRow_, startOfNextRow, + startOfNextRow-hdfsBufNextRow_); bool rowWillBeSelected = true; lastErrorCnd_ = NULL; if(err) @@ -1936,7 +1948,6 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, void ExHdfsScanTcb::computeRangesAtRuntime() { - int numFiles = 0; Int64 totalSize = 0; Int64 myShare = 0; Int64 runningSum = 0; @@ -1949,13 +1960,13 @@ void ExHdfsScanTcb::computeRangesAtRuntime() HDFS_FileInfo *fileInfos; HDFS_Client_RetCode hdfsClientRetcode; - hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles); + hdfsClientRetcode = hdfsClient_->hdfsListDirectory(hdfsScanTdb().hdfsRootDir_, &fileInfos, &numFiles_); ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error:hdfsClient->hdfsListDirectory returned an error") deallocateRuntimeRanges(); // in a first round, count the total number of bytes - for (int f=0; f<numFiles; f++) + for (int f=0; f<numFiles_; f++) { ex_assert(fileInfos[f].mKind == HDFS_FILE_KIND, "subdirectories not supported with runtime HDFS ranges"); @@ -1980,7 +1991,7 @@ void ExHdfsScanTcb::computeRangesAtRuntime() // second round, find out the range of files I need to read for (int g=0; - g < numFiles && runningSum < myEndPositionInBytes; + g < numFiles_ && runningSum < myEndPositionInBytes; g++) { Int64 prevSum = runningSum; http://git-wip-us.apache.org/repos/asf/trafodion/blob/37ab3c03/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index c11329f..371b6b5 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -376,7 +376,7 @@ protected: int prevRangeNum_; int extraBytesRead_; NABoolean recordSkip_; - char *errBuf_; + int numFiles_; }; class ExOrcScanTcb : public ExHdfsScanTcb
