eeroel opened a new issue, #38664: URL: https://github.com/apache/arrow/issues/38664
### Describe the enhancement requested Hi, When reading Parquet concurrently from S3 using Arrow S3fs implementation, I observe that often some threads download data much slower than others. It also seems that the downloads don't get any faster when other threads complete, which I would expect if it was just about network saturation (but networking is not my strong suit). The result of this is that reading a table is often slow and has high variability, and using `pre_buffer=True` can sometimes hamper performance because individual threads download larger chunks. Here's an example of cumulative data fetched versus time nyc-taxi data, with four equal-sized chunks being downloaded from four threads. Two of the downloads take > 2x the time of the two faster ones. The data is extracted from lines in the S3 debug output with "bytes written".  I'm running on Mac OS 14.1, Apple Silicon. Here's the code that reproduces this issue on my system, and writes the logs: ``` import time import pyarrow._s3fs pyarrow._s3fs.initialize_s3(pyarrow._s3fs.S3LogLevel.Trace) pyarrow.set_io_thread_count(100) from pyarrow.dataset import FileSystemDataset, ParquetFileFormat, ParquetFragmentScanOptions import pyarrow import pyarrow.fs import pyarrow.parquet import pyarrow.dataset as ds fs = pyarrow.fs.S3FileSystem(region="us-east-2") fs = pyarrow.fs.SubTreeFileSystem("ursa-labs-taxi-data", fs) format = ParquetFileFormat( default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True), ) # one fragment is enough to reproduce fragments = [format.make_fragment(x, filesystem=fs) for x in ["2018/01/data.parquet"]] # use different file for schema schema = pyarrow.parquet.read_schema("2019/01/data.parquet", filesystem=fs) fsd = FileSystemDataset(fragments, schema, format, fs) t = time.time() # read all data so the pre-buffered chunks are large fsd.to_table( #columns=["passenger_count"], #filter=ds.field('passenger_count') >= 5, batch_readahead=100, fragment_readahead=1 ) print(time.time()-t) ``` With that script in `perf_testing.py` I filter the logs like so: ```python perf_testing.py > out.log && cat out.log | grep "bytes written" > out_filtered.log``` And here is the code to produce the plot from the filtered log: ``` import pandas as pd from matplotlib import pyplot as plt df = pd.read_csv("out_filtered.log", sep=' ', header=None) df = df.assign( ts=lambda x: pd.to_datetime(x[1]+"T"+x[2]), bytes=lambda x: x[5], thread=lambda x: x[4], mb = lambda x: x["bytes"]/1024/1024 )[["ts", "thread", "mb"]] df = df.sort_values("ts") df = df.groupby("thread").agg(cum_mb=("mb", "cumsum")).join(df) fig, ax = plt.subplots(figsize=(8,6)) plt.xlabel("Time") plt.ylabel("MB") for thread, group in df.groupby("thread"): group.plot(x="ts", y="cum_mb", ax=ax, label=thread, alpha=0.5) plt.savefig("out.png") ``` ### Component(s) Parquet, Python -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
