Akshay-A-Kulkarni opened a new issue, #36765:
URL: https://github.com/apache/arrow/issues/36765
### Describe the enhancement requested
### Summary
I'm seeing a discrepancy in read times between `parquet` and `dataset`
modules for reading (more notice-able on fairly large) partitioned parquet
tables.
In a nutshell, `pyarrow.dataset.to_table()` seems to be considerably slower
than `pyarrow.parquet.read_table()` in most cases I have tested so far. The
difference in read times for some synthetic data (see Appendix) I tested on was
approximately 12x.
### Details
```
import sys
sys.version
```
`>>> '3.8.17 (default, Jul 5 2023, 21:04:15) \n[GCC 11.2.0]'`
```python
import pyarrow as pa
pa.__version__
```
`>>> '12.0.1'`
The test was done on a machine similiar in specs to an `m5.4xlarge` EC2
Instance
```python
path = 's3://BUCKET/PREFIX/PARTITIONED_DATA_BASE_DIR'
import pyarrow.parquet as pq
table_pq = pq.read_table(path, partitioning='hive')
print(table_pq.schema)
print(table_pq.shape)
```
> a: string
> b: int64
> c: double
> part_col: dictionary<values=string, indices=int32, ordered=0>
> (100000000, 4)
>
> CPU times: user 21.2 s, sys: 5.4 s, total: 26.6 s
> Wall time: 10.6 s
```python
path = 's3://BUCKET/PREFIX/PARTITIONED_DATA_BASE_DIR'
import pyarrow.dataset as ds
table_ds = ds.dataset(
path,
partitioning='hive',
format='parquet'
).to_table()
print(table_ds.schema)
print(table_ds.shape)
```
> a: string
> b: int64
> c: double
> part_col: dictionary<values=string, indices=int32, ordered=0>
> (100000000, 4)
>
> CPU times: user 49.7 s, sys: 6.67 s, total: 56.4 s
> Wall time: **1min 57s**
From what I've seen, the root cause of the issue seems to be `pre_buffer`,
which is not enabled for `pyarrow.dataset` by default.
Once I add the pre_buffering via `ParquetFileFormat` &
`ParquetFragmentScanOptions` the issue gets resolved.
```python
%%time
table_ds_buffer = ds.dataset(
PATH_TO_S3_PARQUET,
partitioning='hive',
format=ds.ParquetFileFormat(
default_fragment_scan_options=ds.ParquetFragmentScanOptions(
pre_buffer=True
)
)
).to_table()
print(table_ds_buffer.schema)
print(table_ds_buffer.shape)
```
> a: string
> b: int64
> c: double
> part_col: string
> (100000000, 4)
>
> CPU times: user 21 s, sys: 5.27 s, total: 26.2 s
> Wall time: 9.4 s
The issue seems to be exacerbated if the parquet datasets with partitions
(maybe more file parts in general ?). Doing the read test on a single parquet
was still worse but not as worse as the partition one.
Do we know the reasoning for this? As far as I understand, for S3-based
parquet datasets, this could be turned on by default. Otherwise maybe we can
add it as an argument for the user to enable in the `dataset()` call?
## Appendix
Code to generate test data
```python
%%time
from string import ascii_letters
import pyarrow.dataset as ds
import pyarrow as pa
num_rows = 100_000_000
num_files = 1 # 32/64
num_partitions = 4
import pandas as pd
table = pa.Table.from_pydict({
'a': [str(i) for i in range(num_rows)],
'b': range(num_rows),
'c': [float(i) for i in range(num_rows)],
'part_col':np.random.choice(list(ascii_letters[:num_partitions]),
size=num_rows,
# p=[0.8,0.05,0.1,0.05] ## for creating row
num skew between partitions
)
})
ds.write_dataset(
table,
path,
format='parquet',
partitioning_flavor='hive',
partitioning=['part_col'],
max_rows_per_file = (num_rows//(num_files*num_partitions)),
existing_data_behavior='overwrite_or_ignore'
)
```
### Component(s)
Parquet, Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]