Github user sureshsubbiah commented on a diff in the pull request:
https://github.com/apache/trafodion/pull/1417#discussion_r167412036
--- Diff: core/sql/executor/ExHdfsScan.cpp ---
@@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
step_ = DONE;
- else
- step_ = INIT_HDFS_CURSOR;
+ else {
+ if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+ else
+ step_ = SETUP_HDFS_SCAN;
+ }
}
break;
-
+ case SETUP_HDFS_SCAN:
+ {
+ if (hdfsScan_ != NULL)
+ NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(),
hdfsScanBuf_, hdfsScanBufMaxSize_,
+ &hdfsFileInfoListAsArray_, beginRangeNum_,
numRanges_, hdfsScanTdb().rangeTailIOSize_,
+ hdfsStats_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode,
"SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1;
+ currRangeBytesRead_ = 0;
+ recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
+ step_ = TRAF_HDFS_READ;
+ }
+ break;
+ case TRAF_HDFS_READ:
+ {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+ step_ = DONE;
+ break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode,
"SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ +
retArray_[BYTES_COMPLETED];
+ if (retArray_[RANGE_NO] != prevRangeNum_) {
+ currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+ if (hdfo->getStartOffset() == 0)
+ recordSkip_ = FALSE;
+ else
+ recordSkip_ = TRUE;
+ } else {
+ currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ -
headRoomCopied_;
+ recordSkip_ = FALSE;
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+ extraBytesRead_ = currRangeBytesRead_ -
hdfo->getBytesToRead();
+ else
+ extraBytesRead_ = 0;
+ // headRoom_ is the number of extra bytes read
(rangeTailIOSize)
+ // If EOF is reached while reading the range and the
extraBytes read
+ // is less than headRoom_, then process all the data till EOF
+ if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+ extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ +
retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
+ if (recordSkip_) {
+ hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_,
+ (char *)bufEnd_,
+ checkRangeDelimiter_,
+ hdfsScanTdb().getHiveScanMode(), &changedLen);
+ if (hdfsBufNextRow_ == NULL) {
--- End diff --
The last record in a file sometimes has no recordDelimiter. Hive accepts
this. After some trial and error, the libhdfs approach does too. Are we
handling that case correctly? I cannot tell, this is a question.
---