[ 
https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331099#comment-16331099
 ] 

Sandhya Sundaresan commented on TRAFODION-2917:
-----------------------------------------------

Description of the current flow is below. As you can see the opens, reads, and 
main thread processing are highly synchronized so there is no waiting anywhere. 
If the new implementation can retain this level of parallel reading and yet be 
a lot simpler it is worth investing the time in getting rid of this code since 
this code is pretty difficult to debug although we have ironed out many hangs 
and bugs. So I think we should retain this code with a toggle  for a while 
since we’ve ironed out a lot of bugs. We can switch over to new code after some 
exposure and extensive testing. 

 

Here is how the flow goes currently :

1) main thread opens one range (64MB). This open sets up some initial 
structures, open hdfs file and kicks off one worker thread on its mission to 
start fetching, say, 16KB buffers. It does not wait for worker thread to 
complete the fetches. It just returns .

 

2) Main thread determines that there will be one more range to work on after 
the current one completes. So, it calls pre-open on the next range (ie., next 
128MB, a new file). This pre-open request just puts a new pre-open object in 
the pre-open list. Does nothing at this point.

 

3) Main thread then issues a read. Since worker thread had already begun 
fetching 16KB buffers in (1), the main thread most likely will not need to wait 
and the data will be ready. It keeps consuming the buffers and  recycling them 
back into postFetchBufList.

 

4) When done, main thread closes the cursor.

 

What goes in the worker thread when it gets kicked off in (1) -

 

1) Worker thread reads, say 16KB of data and buffers them in prefetchBufList. 
It continues doing this until either buffer limit is reached, or range is 
completely fetched, or was asked prematurely to stop reading.

 

2) After the entire range is completely read, it processes the pre-open request 
that was queued in (2) above. Pre-open is simply another open, which kicks off 
second worker thread to start fetching the next range. The first worker thread 
then goes back to sleep. The pre-open is necessary to  avoid a delay when going 
from one range to another. The cost otherwise would be I/O time to open the new 
hdfs file, position to the correct offset within that file and then kick off 
the next worker thread. With pre-open, the main thread sees a continues stream 
of data already fetched when it does the next open.

 

So with these two parallel activities, data is always prefetched upto one range 
in advance, not more. This is because a new pre-open request is only scheduled 
at the time of new open in the main thread.

 

Only one worker thread reads the data and puts it in the prefetchBufList. This 
is because, the buffers in prefetchBufList are sequentially ordered by the 
offset of the 16KB data read from hdfs file. It cannot have out of order 
buffers because the main thread assumes the next buffer it reads begins at 
where the previous buffer ends. The second worker thread is needed so that when 
first one processes a pre-open, it can kick off second one to start prefetching.

 

Worker threads go away when shutdown request is scheduled. This is in 
LobGlobals destructor which gets called as part of TCB destructor.

> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
>                 Key: TRAFODION-2917
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-2917
>             Project: Apache Trafodion
>          Issue Type: New Feature
>          Components: sql-general
>            Reporter: Selvaganesan Govindarajan
>            Priority: Major
>             Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of 
> ranges to be done by each instance of TCB in TDB. This list of scan ranges is 
> also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB 
> creates two threads to read these ranges.Two ranges (for the TCB) are 
> initially assigned to these threads. As and when a range is completed, the 
> next range (assigned for the TCB) is picked up by the thread. Ranges are read 
> in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan 
> buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into 
> up queue. If the range contains a record split, then the range is extended to 
> read up to range tail IO size to get the full row. The range that had the 
> latter part of the row ignores it because the former range processes it. 
> Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan 
> buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not 
> propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are 
> being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when 
> one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>                   U hdfsCloseFile
>                  U hdfsConnect
>                  U hdfsDelete
>                  U hdfsExists
>                  U hdfsFlush
>                  U hdfsFreeFileInfo
>                  U hdfsGetPathInfo
>                  U hdfsListDirectory
>                  U hdfsOpenFile
>                  U hdfsPread
>                  U hdfsRename
>                  U hdfsWrite
>                  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with 
> better mechanism to move the data from Java and JNI, avoid unnecessary 
> copying of data, better thread management via Executor concepts in Java. 
> Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, 
> use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved 
> performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to