eeroel opened a new issue, #38664:
URL: https://github.com/apache/arrow/issues/38664

   ### Describe the enhancement requested
   
   Hi,
   
   When reading Parquet concurrently from S3 using Arrow S3fs implementation, I 
observe that often some threads download data much slower than others. It also 
seems that the downloads don't get any faster when other threads complete, 
which I would expect if it was just about network saturation (but networking is 
not my strong suit). The result of this is that reading a table is often slow 
and has high variability, and using `pre_buffer=True` can sometimes hamper 
performance because individual threads download larger chunks.
   
   Here's an example of cumulative data fetched versus time nyc-taxi data, with 
four equal-sized chunks being downloaded from four threads. Two of the 
downloads take > 2x the time of the two faster ones. The data is extracted from 
lines in the S3 debug output with "bytes written".
   
![output](https://github.com/apache/arrow/assets/10564706/e4a70288-83be-4b21-a6de-e1b88f7966d8)
   
   I'm running on Mac OS 14.1, Apple Silicon.
   
   Here's the code that reproduces this issue on my system, and writes the logs:
   ```
   import time
   import pyarrow._s3fs
   
   pyarrow._s3fs.initialize_s3(pyarrow._s3fs.S3LogLevel.Trace)
   pyarrow.set_io_thread_count(100)
   
   from pyarrow.dataset import FileSystemDataset, ParquetFileFormat, 
ParquetFragmentScanOptions
   
   import pyarrow
   import pyarrow.fs
   import pyarrow.parquet
   import pyarrow.dataset as ds
   
   fs = pyarrow.fs.S3FileSystem(region="us-east-2")
   fs = pyarrow.fs.SubTreeFileSystem("ursa-labs-taxi-data", fs)
   
   format = ParquetFileFormat(
       
default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
   )
   
   # one fragment is enough to reproduce
   fragments = [format.make_fragment(x, filesystem=fs) for x in 
["2018/01/data.parquet"]]
   
   # use different file for schema
   schema = pyarrow.parquet.read_schema("2019/01/data.parquet", filesystem=fs)
   fsd = FileSystemDataset(fragments, schema, format, fs)
   
   t = time.time()
   # read all data so the pre-buffered chunks are large
   fsd.to_table(
       #columns=["passenger_count"],
       #filter=ds.field('passenger_count') >= 5,
       batch_readahead=100,
       fragment_readahead=1
   )
   print(time.time()-t)
   ```
   
   With that script in `perf_testing.py` I filter the logs like so:
   ```python perf_testing.py > out.log && cat out.log | grep "bytes written" > 
out_filtered.log```
   
   And here is the code to produce the plot from the filtered log:
   ```
   import pandas as pd
   from matplotlib import pyplot as plt
   
   df = pd.read_csv("out_filtered.log", sep=' ', header=None)
   df = df.assign(
       ts=lambda x: pd.to_datetime(x[1]+"T"+x[2]), 
       bytes=lambda x: x[5], thread=lambda x: x[4],
       mb = lambda x: x["bytes"]/1024/1024
   )[["ts", "thread", "mb"]]
   
   df = df.sort_values("ts")
   
   df = df.groupby("thread").agg(cum_mb=("mb", "cumsum")).join(df)
   fig, ax = plt.subplots(figsize=(8,6))
   plt.xlabel("Time")
   plt.ylabel("MB")
   for thread, group in df.groupby("thread"):
       group.plot(x="ts", y="cum_mb", ax=ax, label=thread, alpha=0.5)
   plt.savefig("out.png")
   ```
   
   
   
   
   
   
   
   ### Component(s)
   
   Parquet, Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to