[ https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360334#comment-16360334 ]
ASF GitHub Bot commented on TRAFODION-2917: ------------------------------------------- Github user selvaganesang commented on a diff in the pull request: https://github.com/apache/trafodion/pull/1417#discussion_r167473572 --- Diff: core/sql/executor/ExHdfsScan.cpp --- @@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work() if (step_ == CHECK_FOR_DATA_MOD_AND_DONE) step_ = DONE; - else - step_ = INIT_HDFS_CURSOR; + else { + if (useLibhdfsScan_) + step_ = INIT_HDFS_CURSOR; + else + step_ = SETUP_HDFS_SCAN; + } } break; - + case SETUP_HDFS_SCAN: + { + if (hdfsScan_ != NULL) + NADELETE(hdfsScan_, HdfsScan, getHeap()); + hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, + &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, + hdfsStats_, hdfsScanRetCode); + if (hdfsScanRetCode != HDFS_SCAN_OK) + { + setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", + currContext->getJniErrorStr(), NULL); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + bufBegin_ = NULL; + bufEnd_ = NULL; + bufLogicalEnd_ = NULL; + headRoomCopied_ = 0; + prevRangeNum_ = -1; + currRangeBytesRead_ = 0; + recordSkip_ = FALSE; + extraBytesRead_ = 0; + step_ = TRAF_HDFS_READ; + } + break; + case TRAF_HDFS_READ: + { + hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int)); + if (hdfsScanRetCode == HDFS_SCAN_EOR) { + step_ = DONE; + break; + } + else if (hdfsScanRetCode != HDFS_SCAN_OK) { + setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", + currContext->getJniErrorStr(), NULL); + step_ = HANDLE_ERROR_AND_DONE; + break; + } + hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]); + bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; + if (retArray_[RANGE_NO] != prevRangeNum_) { + currRangeBytesRead_ = retArray_[BYTES_COMPLETED]; + bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_; + if (hdfo->getStartOffset() == 0) + recordSkip_ = FALSE; + else + recordSkip_ = TRUE; + } else { + currRangeBytesRead_ += retArray_[BYTES_COMPLETED]; + bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_; + recordSkip_ = FALSE; + } + if (currRangeBytesRead_ > hdfo->getBytesToRead()) + extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); + else + extraBytesRead_ = 0; + // headRoom_ is the number of extra bytes read (rangeTailIOSize) + // If EOF is reached while reading the range and the extraBytes read + // is less than headRoom_, then process all the data till EOF + if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_) + extraBytesRead_ = 0; + bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_; + prevRangeNum_ = retArray_[RANGE_NO]; + headRoomCopied_ = 0; + if (recordSkip_) { + hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_, + hdfsScanTdb().recordDelimiter_, + (char *)bufEnd_, + checkRangeDelimiter_, + hdfsScanTdb().getHiveScanMode(), &changedLen); + if (hdfsBufNextRow_ == NULL) { --- End diff -- Yes > Refactor Trafodion implementation of hdfs scan for text formatted hive tables > ----------------------------------------------------------------------------- > > Key: TRAFODION-2917 > URL: https://issues.apache.org/jira/browse/TRAFODION-2917 > Project: Apache Trafodion > Issue Type: New Feature > Components: sql-general > Reporter: Selvaganesan Govindarajan > Priority: Major > Fix For: 2.3 > > > Find below the general outline of hdfs scan for text formatted hive tables. > Compiler returns a list of scan ranges and the begin range and number of > ranges to be done by each instance of TCB in TDB. This list of scan ranges is > also re-computed at run time possibly based on a CQD > The scan range for a TCB can come from the same or different hdfs files. TCB > creates two threads to read these ranges.Two ranges (for the TCB) are > initially assigned to these threads. As and when a range is completed, the > next range (assigned for the TCB) is picked up by the thread. Ranges are read > in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan > buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into > up queue. If the range contains a record split, then the range is extended to > read up to range tail IO size to get the full row. The range that had the > latter part of the row ignores it because the former range processes it. > Record split at the file level is not possible and/or not supported. > For compression, the compiler returns the range info such that the hdfs scan > buffer can hold the full uncompressed buffer. > Cons: > Reader threads feature too complex to maintain in C++ > Error handling at the layer below the TCB is missing or errors are not > propagated to work method causing incorrect results > Possible multiple copying of data > Libhdfs calls are not optimized. It was observed that the method Ids are > being obtained many times. Need to check if this problem still exists. > Now that we clearly know what is expected, it could be optimized better > - Reduced scan buffer size for smoother data flow > - Better thread utilization > - Avoid multiple copying of data. > Unable to comprehend the need for two threads for pre-fetch especially when > one range is completed fully before the data from next range is processed. > Following are the hdfsCalls used by programs at exp and executor directory. > U hdfsCloseFile > U hdfsConnect > U hdfsDelete > U hdfsExists > U hdfsFlush > U hdfsFreeFileInfo > U hdfsGetPathInfo > U hdfsListDirectory > U hdfsOpenFile > U hdfsPread > U hdfsRename > U hdfsWrite > U hdfsCreateDirectory > New implementation > Make changes to use direct Java APIs for these calls. However, come up with > better mechanism to move the data from Java and JNI, avoid unnecessary > copying of data, better thread management via Executor concepts in Java. > Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, > use the abstraction like what is being done for HBase access. > I believe newer implementation will be optimized better and hence improved > performance. (but not many folds) -- This message was sent by Atlassian JIRA (v7.6.3#76005)