Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473572
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     
                 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
                   step_ = DONE;
    -            else
    -              step_ = INIT_HDFS_CURSOR;
    +            else {
    +              if (useLibhdfsScan_)
    +                 step_ = INIT_HDFS_CURSOR;
    +              else
    +                 step_ = SETUP_HDFS_SCAN;
    +            }
               }        
               break;
    -
    +        case SETUP_HDFS_SCAN:
    +          {   
    +             if (hdfsScan_ != NULL)
    +                NADELETE(hdfsScan_, HdfsScan, getHeap());
    +             hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
    +                            &hdfsFileInfoListAsArray_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
    +                            hdfsStats_, hdfsScanRetCode);
    +             if (hdfsScanRetCode != HDFS_SCAN_OK)
    +             {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);        
      
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             bufBegin_ = NULL;
    +             bufEnd_ = NULL;
    +             bufLogicalEnd_ = NULL;
    +             headRoomCopied_ = 0;
    +             prevRangeNum_ = -1;                         
    +             currRangeBytesRead_ = 0;                   
    +             recordSkip_ = FALSE;
    +             extraBytesRead_ = 0;
    +             step_ = TRAF_HDFS_READ;
    +          } 
    +          break;
    +        case TRAF_HDFS_READ:
    +          {
    +             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
    +             if (hdfsScanRetCode == HDFS_SCAN_EOR) {
    +                step_ = DONE;
    +                break;
    +             }
    +             else if (hdfsScanRetCode != HDFS_SCAN_OK) {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);        
      
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
    +             bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
    +             if (retArray_[RANGE_NO] != prevRangeNum_) {  
    +                currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
    +                if (hdfo->getStartOffset() == 0)
    +                   recordSkip_ = FALSE;
    +                else
    +                   recordSkip_ = TRUE; 
    +             } else {
    +                currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
    +                recordSkip_ = FALSE;
    +             }
    +             if (currRangeBytesRead_ > hdfo->getBytesToRead())
    +                extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
    +             else
    +                extraBytesRead_ = 0;
    +             // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
    +             // If EOF is reached while reading the range and the 
extraBytes read
    +             // is less than headRoom_, then process all the data till EOF 
    +             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
    +                extraBytesRead_ = 0;
    +             bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
    +             prevRangeNum_ = retArray_[RANGE_NO];
    +             headRoomCopied_ = 0;
    +             if (recordSkip_) {
    +           hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
    +                         hdfsScanTdb().recordDelimiter_, 
    +                              (char *)bufEnd_,
    +                         checkRangeDelimiter_, 
    +                         hdfsScanTdb().getHiveScanMode(), &changedLen);
    +                if (hdfsBufNextRow_ == NULL) {
    --- End diff --
    
    Yes


---

Reply via email to