[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360334#comment-16360334
 ] 

ASF GitHub Bot commented on TRAFODION-2917:
-------------------------------------------

Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/trafodion/pull/1417#discussion_r167473572
  
    --- Diff: core/sql/executor/ExHdfsScan.cpp ---
    @@ -514,11 +541,108 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
     
                 if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
                   step_ = DONE;
    -            else
    -              step_ = INIT_HDFS_CURSOR;
    +            else {
    +              if (useLibhdfsScan_)
    +                 step_ = INIT_HDFS_CURSOR;
    +              else
    +                 step_ = SETUP_HDFS_SCAN;
    +            }
               }        
               break;
    -
    +        case SETUP_HDFS_SCAN:
    +          {   
    +             if (hdfsScan_ != NULL)
    +                NADELETE(hdfsScan_, HdfsScan, getHeap());
    +             hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), 
hdfsScanBuf_, hdfsScanBufMaxSize_, 
    +                            &hdfsFileInfoListAsArray_, beginRangeNum_, 
numRanges_, hdfsScanTdb().rangeTailIOSize_, 
    +                            hdfsStats_, hdfsScanRetCode);
    +             if (hdfsScanRetCode != HDFS_SCAN_OK)
    +             {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);        
      
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             bufBegin_ = NULL;
    +             bufEnd_ = NULL;
    +             bufLogicalEnd_ = NULL;
    +             headRoomCopied_ = 0;
    +             prevRangeNum_ = -1;                         
    +             currRangeBytesRead_ = 0;                   
    +             recordSkip_ = FALSE;
    +             extraBytesRead_ = 0;
    +             step_ = TRAF_HDFS_READ;
    +          } 
    +          break;
    +        case TRAF_HDFS_READ:
    +          {
    +             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap 
*)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
    +             if (hdfsScanRetCode == HDFS_SCAN_EOR) {
    +                step_ = DONE;
    +                break;
    +             }
    +             else if (hdfsScanRetCode != HDFS_SCAN_OK) {
    +                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, 
"SETUP_HDFS_SCAN", 
    +                              currContext->getJniErrorStr(), NULL);        
      
    +                step_ = HANDLE_ERROR_AND_DONE;
    +                break;
    +             } 
    +             hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
    +             bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED];
    +             if (retArray_[RANGE_NO] != prevRangeNum_) {  
    +                currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
    +                if (hdfo->getStartOffset() == 0)
    +                   recordSkip_ = FALSE;
    +                else
    +                   recordSkip_ = TRUE; 
    +             } else {
    +                currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
    +                bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - 
headRoomCopied_;
    +                recordSkip_ = FALSE;
    +             }
    +             if (currRangeBytesRead_ > hdfo->getBytesToRead())
    +                extraBytesRead_ = currRangeBytesRead_ - 
hdfo->getBytesToRead(); 
    +             else
    +                extraBytesRead_ = 0;
    +             // headRoom_ is the number of extra bytes read 
(rangeTailIOSize)
    +             // If EOF is reached while reading the range and the 
extraBytes read
    +             // is less than headRoom_, then process all the data till EOF 
    +             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
    +                extraBytesRead_ = 0;
    +             bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + 
retArray_[BYTES_COMPLETED] - extraBytesRead_;
    +             prevRangeNum_ = retArray_[RANGE_NO];
    +             headRoomCopied_ = 0;
    +             if (recordSkip_) {
    +           hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
    +                         hdfsScanTdb().recordDelimiter_, 
    +                              (char *)bufEnd_,
    +                         checkRangeDelimiter_, 
    +                         hdfsScanTdb().getHiveScanMode(), &changedLen);
    +                if (hdfsBufNextRow_ == NULL) {
    --- End diff --
    
    Yes


> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
>                 Key: TRAFODION-2917
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-2917
>             Project: Apache Trafodion
>          Issue Type: New Feature
>          Components: sql-general
>            Reporter: Selvaganesan Govindarajan
>            Priority: Major
>             Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>                   U hdfsCloseFile
>                  U hdfsConnect
>                  U hdfsDelete
>                  U hdfsExists
>                  U hdfsFlush
>                  U hdfsFreeFileInfo
>                  U hdfsGetPathInfo
>                  U hdfsListDirectory
>                  U hdfsOpenFile
>                  U hdfsPread
>                  U hdfsRename
>                  U hdfsWrite
>                  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to