[ 
https://issues.apache.org/jira/browse/HADOOP-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16820420#comment-16820420
 ] 

Steve Loughran commented on HADOOP-16241:
-----------------------------------------

Sahil, this is a fun topic. The current random policy is based on traces of 
Hive+ORC HADOOP-13203, at a time when we considered Impala+Parquet *someone 
else's problem*. This behaviour was

1. open file
2. seek to near end, read footer
3. seek to stripe, maybe read footer (and then go back)
4. or, read two blocks back to back through 2+ positioned reads


The back-to back pair of positioned reads is why random IO includes a slightly 
larger size. Because all GET requests against AWS go through the load balancer 
and have a tangible setup time. *It is faster to read ahead and discard data 
than it is to reopen a new one*. With connection pooling, if there is capacity, 
you don't wait for a full TLS negotiation, but there is still a gap in the IO. 
I'll leave you to experiment -for me doing remote tests at readahead ranger of 
768K seems best. 

1. If you ask for a 64KB read, you get the readahead range.
2. If you ask for more, you get the size of your read.
3. If you try to read a .gz or CSV file in random IO, performance collapses, 
hence "normal" is critical there
4. and switching from normal to random is potentially expensive, but given you 
are reading off the footer, it's less expensive than aborting the connection, 
at least if the readahead is enough

to comment on your specific complaints

bq. The current implementation of PositionReadable in S3AInputStream is pretty 
close to the default implementation in FsInputStream.

no.

bq.  Data is only read in 64 kb chunks, so for a large read, several GET 
requests must be issued to S3 to fetch the data; while the 64 kb chunk value is 
configurable, it is hard to set a reasonable value for variable length preads

no.

bq. Unless the client explicitly sets fadvise to RANDOM, they will get at least 
one connection reset when the backwards seek is issued (after which fadvise 
automatically switches to RANDOM)
Data is only read in 64 kb chunks, so for a large read, several GET requests 
must be issued to S3 to fetch the data; while the 64 kb chunk value is 
configurable, it is hard to set a reasonable value for variable length preads

no

bq. If the readahead value is too big, closing the input stream can take 
considerable time because the stream has to be drained of data before it can be 
closed

generally very low cost; way less expensive than an abort, and offers the 
benefit of the cost of two back to back preads is low. If you have issues with 
this, make it smaller.

bq. Unless the client explicitly sets fadvise to RANDOM, they will get at least 
one connection reset when the backwards seek is issued (after which fadvise 
automatically switches to RANDOM)

except if its a footer and the tail of it is close enough to the EOF to trigger 
the drain.

As for the improvements

bq. The new implementation of PositionReadable would issue a GetObjectRequest 
with the range specified by position and the size of the given buffer. The data 
would be read from the S3ObjectInputStream and then closed at the end of the 
method. This stream would be independent of the wrappedStream currently 
maintained by S3A.

as you get todaqy

This brings the following benefits:

1. The PositionedReadable methods can be thread-safe without a synchronized 
block, which allows clients to concurrently call pread methods on the same 
S3AInputStream instance
preads will request all the data at once rather than requesting it in chunks 
via the readahead logic.

Potentially beneficial if > 1 thread is trying to read the same input stream. 
The java API spec of input stream says "not thread safe", but due to HBases 
expectations, we say for fsdatinputstream
Avoids performing potentially expensive seeks when performing preads

2. preads will request all the data at once rather than requesting it in chunks 
via the readahead logic

already implemented in random mode

3. Avoids performing potentially expensive seeks when performing preads

iff the stream was opened in normal and then switched to sequential. If you 
open it in random all is good, at least for single thread IO.

There's a limitation in hadoop <= 3.2, where you have to declare before an FS 
instance is created. However, HADOOP-14332 now lets your openFile() build to 
set the seek policy when opened.

the main benefit of your proposal is, once you've switched your client to 
openFile, that you can do parallel pReads.

# I need trace data from your application reading columnar data (Parquet and 
ORC) to show benefits. This would be good. 
# if pread perf is due to some quirks of your applications read patterns, 
consider whether they can be optimised today.
# you should move to openFile and then explicitly turn on randome when input is 
Parquet or ORC and the compression format is seekable.
# Vectored IO, HADOOP-11867, is something we've been thinking about. If the 
column format can predict read patterns it can use this, and in the stack we 
can push this down to the connector, which can choose when to merge reads for 
max perf, when to reorder based on stream state, and, once HTTP2 is used, 
multiplex the reads. This offers the best long term potential, and as we'd only 
need to modify ORC and Parquet for this, possibly large benefits. [~gopalv] 
thinks this could be good.

FWIW, I've been thinking of how to optimise for the backward reads by actually 
having a notion of a cache: blocks are read in say, 4MB aligned chunks, and if 
a read goes backwards some of the cached chunks will contain some, maybe all 
the data. This assumes that backward seeks from stripe footers to stripe 
headers is common. Both AbfsInputStream does this. as does GCS. Also 
[~owen.omalley] has been talking about adding the stripe footer data as header, 
to reduce the #of backward seeks. When combined with Hive passing down a 
filtered version of the footer to all query parts (as it now does), you could 
eliminate backward seeks entirely. But if we can make the jump to vectored IO, 
this caching may not be needed.


To close: get traces, we can then discuss for this patch, vectored IO, etc. 

Now, secret trick: if you call toString() on an S3A input stream, open or 
close, it gives you all the per-stream statistics we collect, including # of 
aborts, bytes discarded, backward seeks, etc etc. and as 
FSDataInputStreamt.toString() calls the wrapped stream, logging the fs input 
stream is enough to collect the stats, without you having to get the wrapped 
stream, cast to S3AInputStream and call getS3AStreamStatistics for the 
structured stream statistics. Which we collect per-stream and then push back to 
S3 on close, and which are visible through S3AFileSystem's getStorageStatistics

I really look forward to those traces, and will, sadly, veto all seek changes 
until you share them. Sorry.

ps: bet you wish you hadn't asked me for my opinions :)

> S3AInputStream PositionReadable should perform ranged read on dedicated 
> stream 
> -------------------------------------------------------------------------------
>
>                 Key: HADOOP-16241
>                 URL: https://issues.apache.org/jira/browse/HADOOP-16241
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/s3
>            Reporter: Sahil Takiar
>            Assignee: Sahil Takiar
>            Priority: Major
>
> The current implementation of {{PositionReadable}} in {{S3AInputStream}} is 
> pretty close to the default implementation in {{FsInputStream}}.
> This JIRA proposes overriding the {{read(long position, byte[] buffer, int 
> offset, int length)}} method and re-implementing the {{readFully(long 
> position, byte[] buffer, int offset, int length)}} method in S3A.
> The new implementation would perform a "ranged read" on a dedicated object 
> stream (rather than the shared one). Prototypes have shown this to bring a 
> considerable performance improvement to readers who are only interested in 
> reading a random chunk of the file at a time (e.g. Impala, although I would 
> assume HBase would benefit from this as well).
> Setting {{fs.s3a.experimental.input.fadvise}} to {{RANDOM}} is helpful for 
> clients that rely on pread, but has a few drawbacks:
>  * Unless the client explicitly sets fadvise to RANDOM, they will get at 
> least one connection reset when the backwards seek is issued (after which 
> fadvise automatically switches to RANDOM)
>  * Data is only read in 64 kb chunks, so for a large read, several GET 
> requests must be issued to S3 to fetch the data; while the 64 kb chunk value 
> is configurable, it is hard to set a reasonable value for variable length 
> preads
>  * If the readahead value is too big, closing the input stream can take 
> considerable time because the stream has to be drained of data before it can 
> be closed
> The new implementation of {{PositionReadable}} would issue a 
> {{GetObjectRequest}} with the range specified by {{position}} and the size of 
> the given buffer. The data would be read from the {{S3ObjectInputStream}} and 
> then closed at the end of the method. This stream would be independent of the 
> {{wrappedStream}} currently maintained by S3A.
> This brings the following benefits:
>  * The {{PositionedReadable}} methods can be thread-safe without a 
> {{synchronized}} block, which allows clients to concurrently call pread 
> methods on the same {{S3AInputStream}} instance
>  * preads will request all the data at once rather than requesting it in 
> chunks via the readahead logic
>  * Avoids performing potentially expensive seeks when performing preads



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to