CaselIT opened a new issue, #39079:
URL: https://github.com/apache/arrow/issues/39079
### Describe the enhancement requested
Hi,
I'm testing how to save in an optimal mode a table partitioned over one or
more columns, with the objective of reading the single partitions later. While
trying this I noticed that the current logic to save and especially load a
partitioned table is very slow.
While experimenting on ways of speeding up this use case I noticed that the
row groups of parquet can have an arbitrary size and a single parquet can have
row groups of many different sizes.
I think there could be a partitioning scheme that makes use of this feature
of parquet files.
From some initial the tests I've done using only the public python api the
serialization time is generally comparable while the read time is up to 10x
faster when increasing the number of partitions. It's likely that if this
scheme were natively implemented the result would be better.
The example code I tried is the following. I'm using pandas to partition the
table, but it should not matter much on the times (if anything it should
penalize the row group partitioning).
```py
from contextlib import contextmanager
import pandas as pd
import numpy as np
import pyarrow
import pyarrow.parquet
import shutil
import time
rng = np.random.default_rng(seed=42)
shape = (1_000_000, 13) # this results in about 100 MB of data
num_partitions = (100, 250, 500)
@contextmanager
def timectx(name):
s = time.perf_counter()
yield
e = time.perf_counter()
print(" ", name, (e - s) * 1000, "ms")
def make_data(num_partition):
data = pd.DataFrame(rng.uniform(size=shape), columns=[f"c{i}" for i in
range(shape[1])])
# make a key column used for partitioning with partitions of random size.
# The keys are randomly sorted in the dataframe
splits = set()
while len(splits) < num_partition - 1:
splits.add(rng.integers(1, len(data)))
splits.add(len(data))
start = 0
arr = []
for i, end in enumerate(sorted(splits)):
arr.append(np.full(end - start, i))
start = end
values = np.concatenate(arr)
rng.shuffle(values)
data["key"] = values.astype(np.int64)
return data.copy()
def run(num_partition):
print(f"Saving data with {num_partition} partitions")
df = make_data(num_partition)
keys = df["key"].unique()
assert len(keys) == num_partition
shutil.rmtree("write_to_dataset", ignore_errors=True)
table = pyarrow.Table.from_pandas(df)
with timectx("save using write_to_dataset"):
pyarrow.parquet.write_to_dataset(
table,
"write_to_dataset",
partition_cols=["key"],
write_statistics=False,
use_dictionary=False,
)
rng.shuffle(keys) # randomize access
with timectx("load partitions using read_table"):
for key in keys:
pyarrow.parquet.read_table("write_to_dataset", filters=[("key",
"==", key)])
shutil.rmtree("row_group", ignore_errors=True)
with timectx("save using row groups"):
# include the processing in pandas in the time even if using the
# table directly is likely faster
parts = dict(list(df.groupby("key")))
pa_table = pyarrow.Table.from_pandas(next(iter(parts.values())))
key_to_index = {}
with pyarrow.parquet.ParquetWriter(
"row_group",
pa_table.schema,
write_statistics=False,
use_dictionary=False,
) as pw:
for index, (key, group) in enumerate(parts.items()):
size = len(group)
pa_table = pyarrow.Table.from_pandas(group)
pw.write_table(pa_table, row_group_size=size)
key_to_index[key] = index
with timectx("load partitions from row group"):
for key in keys:
index = key_to_index[key]
# simulate opening the file each time, moving the for inside the
with is faster but it
# does not simulate classic the classing
with pyarrow.parquet.ParquetFile("row_group") as pf:
pf.read_row_group(index)
for ns in num_partitions:
run(ns)
```
The result on windows are on my pc the following:
```
Saving data with 100 partitions
save using write_to_dataset 387.8823999548331 ms
load partitions using read_table 1924.649199936539 ms
save using row groups 540.5166000127792 ms
load partitions from row group 393.6318999622017 ms
Saving data with 250 partitions
save using write_to_dataset 922.8724999120459 ms
load partitions using read_table 11072.766500059515 ms
save using row groups 996.5518999379128 ms
load partitions from row group 1503.8880000356585 ms
Saving data with 500 partitions
save using write_to_dataset 1778.2125999219716 ms
load partitions using read_table 38530.92749998905 ms
save using row groups 1454.6020000707358 ms
load partitions from row group 4754.040600033477 ms
```
Using docker the results are similar
```
Saving data with 100 partitions
save using write_to_dataset 2058.0104130003747 ms
load partitions using read_table 1066.399915999682 ms
save using row groups 933.8465500004531 ms
load partitions from row group 230.3446129999429 ms
Saving data with 250 partitions
save using write_to_dataset 4654.56907800035 ms
load partitions using read_table 4715.190167000401 ms
save using row groups 1677.210732000276 ms
load partitions from row group 797.0339869998497 ms
Saving data with 500 partitions
save using write_to_dataset 9070.415569000033 ms
load partitions using read_table 17305.43225999918 ms
save using row groups 2508.86906300002 ms
load partitions from row group 2386.4264080002613 ms
```
In both cases I'm using python 3.10 with the following libraries:
```
pyarrow==14.0.1
pandas==2.1.3
numpy==1.26.2
```
A couple of consideration:
- since each row group in a parquet stores a copy of the metadata, saving
more than 500-1000 groups in a single file makes the overhead of reading the
metadata a very large part of the time used to open and load the data. This can
be fixed by using a directory structure like hive, storing multiple parquet
files when a large number of groups are present
- need to store the information regarding key -> index (and optionally
file). An array/table/similar data structure stored inside the metadata of the
parquet file could be used to store it.
Thanks for reading.
### Component(s)
Parquet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]