[
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360296#comment-16360296
]
ASF GitHub Bot commented on TRAFODION-2917:
-------------------------------------------
Github user sureshsubbiah commented on a diff in the pull request:
https://github.com/apache/trafodion/pull/1417#discussion_r167412036
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
step_ = DONE;
- else
- step_ = INIT_HDFS_CURSOR;
+ else {
+ if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+ else
+ step_ = SETUP_HDFS_SCAN;
+ }
}
break;
-
+ case SETUP_HDFS_SCAN:
+ {
+ if (hdfsScan_ != NULL)
+ NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(),
hdfsScanBuf_, hdfsScanBufMaxSize_,
+ &hdfsFileInfoListAsArray_, beginRangeNum_,
numRanges_, hdfsScanTdb().rangeTailIOSize_,
+ hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode,
"SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1;
+ currRangeBytesRead_ = 0;
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+ }
+ break;
+ case TRAF_HDFS_READ:
+ {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+ step_ = DONE;
+ break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode,
"SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ +
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {
+ currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+ if (hdfo->getStartOffset() == 0)
+ recordSkip_ = FALSE;
+ else
+ recordSkip_ = TRUE;
+ } else {
+ currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ -
headRoomCopied_;
+ recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+ extraBytesRead_ = currRangeBytesRead_ -
hdfo->getBytesToRead();
+ else
+ extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the
extraBytes read
+ // is less than headRoom_, then process all the data till EOF
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+ extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ +
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+ hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_,
+ (char *)bufEnd_,
+ checkRangeDelimiter_,
+ hdfsScanTdb().getHiveScanMode(), &changedLen);
+ if (hdfsBufNextRow_ == NULL) {
--- End diff --
The last record in a file sometimes has no recordDelimiter. Hive accepts
this. After some trial and error, the libhdfs approach does too. Are we
handling that case correctly? I cannot tell, this is a question.
> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
> Key: TRAFODION-2917
> URL: https://issues.apache.org/jira/browse/TRAFODION-2917
> Project: Apache Trafodion
> Issue Type: New Feature
> Components: sql-general
> Reporter: Selvaganesan Govindarajan
> Priority: Major
> Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files. TCB
> creates two threads to read these ranges.Two ranges (for the TCB) are
> initially assigned to these threads. As and when a range is completed, the
> next range (assigned for the TCB) is picked up by the thread. Ranges are read
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into
> up queue. If the range contains a record split, then the range is extended to
> read up to range tail IO size to get the full row. The range that had the
> latter part of the row ignores it because the former range processes it.
> Record split at the file level is not possible and/or not supported.
> For compression, the compiler returns the range info such that the hdfs scan
> buffer can hold the full uncompressed buffer.
> Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
> - Reduced scan buffer size for smoother data flow
> - Better thread utilization
> - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when
> one range is completed fully before the data from next range is processed.
> Following are the hdfsCalls used by programs at exp and executor directory.
> U hdfsCloseFile
> U hdfsConnect
> U hdfsDelete
> U hdfsExists
> U hdfsFlush
> U hdfsFreeFileInfo
> U hdfsGetPathInfo
> U hdfsListDirectory
> U hdfsOpenFile
> U hdfsPread
> U hdfsRename
> U hdfsWrite
> U hdfsCreateDirectory
> New implementation
> Make changes to use direct Java APIs for these calls. However, come up with
> better mechanism to move the data from Java and JNI, avoid unnecessary
> copying of data, better thread management via Executor concepts in Java.
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead,
> use the abstraction like what is being done for HBase access.
> I believe newer implementation will be optimized better and hence improved
> performance. (but not many folds)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)