[
https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342515#comment-17342515
]
David Li commented on ARROW-11781:
----------------------------------
Just to follow up: Arrow 4.0.0 was released with the optimization in
ARROW-8658, are you able to see if that improves your use case?
> [Python] Reading small amount of files from a partitioned dataset is
> unexpectedly slow
> --------------------------------------------------------------------------------------
>
> Key: ARROW-11781
> URL: https://issues.apache.org/jira/browse/ARROW-11781
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Reporter: Jeroen
> Priority: Minor
> Fix For: 5.0.0
>
> Attachments: spy.svg, spy2.svg, spy3.svg
>
>
> I posted this on StackOverflow and was told I should probably create an issue
> here.
> I managed to create a relative minimal example:
> {code:java}
> df = spark.createDataFrame(
> [
> (str(a), b, c, random.randint(0, 1000))
> for a in range(100)
> for b in range(10)
> for c in range(10000)
> ],
> ['a', 'b', 'c', 'd']
> )
> print("Writing the spark dataframe to the file system in partitioned
> folders.")
> df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir),
> compression='snappy', mode='overwrite')
> def time_it(func, repetition=10):
> start = time.time()
> for _ in range(repetition):
> func()
> return (time.time() - start) / repetition
> print("Loading the entire dataset")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))
> print("Loading a single file using filters")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow',
> filters=[[('a', '=', '0'), ('b', '=', '0')]])))
> print("Loading a single file using filters and a specified partitioning")
> partitioning = pa.dataset.HivePartitioning(
> pa.schema([
> pa.field('a', pa.string()),
> pa.field('b', pa.string())
> ])
> )
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow',
> filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))
> print("Loading a single file by specifying the path")
> print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0',
> engine='pyarrow')))
> {code}
> Which gives me the following output:
> {code:java}
> Writing the spark dataframe to the file system in partitioned folders.
> Loading the entire dataset
> 0.23926825523376466
> Loading a single file using filters
> 0.04788286685943603
> Loading a single file using filters and a specified partitioning
> 0.0323061466217041
> Loading a single file by specifying the path
> 0.0017130613327026368
> {code}
>
> Loading the small amount of files is about 20 times faster if you address the
> paths directly, compared to the pyarrow filters.
>
> The question as I posted it on StackOverflow:
> I am having some problems with the speed of loading `.parquet` files.
> However, I don't know what I am doing wrong.
> *Problem*
> I am trying to read a single `.parquet` file from from my local filesystem
> which is the partitioned output from a spark job. Such that there are
> `.parquet` files in hierarchical directories named `a=x` and `b=y`.
> To achieve this, I am using `pandas.read_parquet` (which uses
> `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The
> run time of using the `filters` is way longer than I would expect.
> {code:java}
> # The following runs for about 55 seconds
> pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b',
> '=', 'y')]])
> # The following runs for about 0.04 seconds
> pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
> # The following runs for about 70 seconds
> pd.read_parquet(<path_to_entire_dataset>){code}
> Reading a single parquet file by specifying filters is only slightly faster
> than loading the entire dataset, where I would expect a run time
> approximately linear in the amount of files.
> *What mistake do I make here?*
> I realize that simply putting the filters in the path would work, however
> this will quickly become complex as what I want to filter on will / can
> change. Besides, I think `read_table` should be able to load this data
> efficiently.
> PS: The entire dataset contains many millions of rows, the data I want to
> load is only a few thousand rows.
> *Edit 1:*
> As suggested by 0x26res I manually defined the partitioning, this lead to a
> significant speed up, but still not as much as I would have expected. In this
> situation the run time was about 5 seconds.
> {code:java}
> partitioning = HivePartitioning(
> pa.schema([
> pa.field('a', pa.string()),
> pa.field('b', pa.int32()),
> ])
> )
> pd.read_parquet(
> <path_to_entire_dataset>,
> engine='pyarrow',
> filters=[
> [
> ('a', '=', x),
> ('b', '=', y),
> ]
> ],
> partitioning=partitioning
> )
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)