Andy Teucher created ARROW-15069:
------------------------------------
Summary: [R] open_dataset very slow on heavily partitioned parquet
dataset
Key: ARROW-15069
URL: https://issues.apache.org/jira/browse/ARROW-15069
Project: Apache Arrow
Issue Type: Bug
Components: R
Affects Versions: 6.0.1
Environment: macOS Mojave, R 4.1.1
Reporter: Andy Teucher
Opening a (particular) partitioned hive-style parquet dataset is very slow (45s
to 1 minute). I have a reproducible example below that takes 780 csv files and
writes them to parquet using the {{open_dataset("csv files") |> group_by(vars)
|> write_dataset("parquet")}} suggested
[here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets].
Opening and querying the subsequent parquet dataset is much slower than doing
it on the original csv files, which is not what I expected.
{code:java}
library(arrow)
library(dplyr)
library(tictoc)
zipfile <- "ahccd.zip"
csv_dir <- "data/csv"
parquet_dir <- "data/parquet"
dir.create(csv_dir, recursive = TRUE)
dir.create(parquet_dir, recursive = TRUE)
# A zip of 780 csvs of daily temperature data at Canadian climate stations (one
file per station)
download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1",
destfile = zipfile)
unzip(zipfile, exdir = csv_dir)
csv_schema <- schema(
field("stn_id", string()),
field("stn_name", string()),
field("measure", string()),
field("date", date32()),
field("year", int64()),
field("month", int64()),
field("temp", double()),
field("province", string()),
field("stn_joined", string()),
field("element", string()),
field("unit", string()),
field("stn_last_updated", string()),
field("flag", string())
)
csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
# Write to parquet, partitioning on stn_id, year, measure
tic("write csv to parquet")
arrow::open_dataset("data/csv", schema = csv_schema,
format = csv_format) |>
group_by(stn_id, year, measure) |>
write_dataset(parquet_dir, format = "parquet")
toc()
#> write csv to parquet: 2067.093 sec elapsed
stations <- c("1100031", "1100120", "1100119", "1036B06")
## Query directory of original csv files
tic("query csv")
foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
format = csv_format) |>
filter(year >= 1990,
year <= 2020,
stn_id %in% stations,
measure == "daily_max") |>
collect()
toc()
#> query csv: 12.571 sec elapsed
## Query the hive-style parquet directory
tic("query parquet")
bar <- arrow::open_dataset("data/parquet") |>
filter(year >= 1990,
year <= 2020,
stn_id %in% stations,
measure == "daily_max") |>
collect()
toc()
#> query parquet: 41.79 sec elapsed
## It turns out that it is just the opening of the dataset
## that takes so long
tic("open parquet dataset")
ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
toc()
#> open parquet dataset: 45.581 sec elapsed
ds
#> FileSystemDataset with 191171 Parquet files
#> stn_name: string
#> date: date32[day]
#> month: int64
#> temp: double
#> province: string
#> stn_joined: string
#> element: string
#> unit: string
#> stn_last_updated: string
#> flag: string
#> stn_id: string
#> year: int32
#> measure: string
tic("query already openend dataset")
ds |>
filter(year >= 1990,
year <= 2020,
stn_id %in% stations,
measure == "daily_max") |>
collect()
#> # A tibble: 44,469 × 13
#> stn_name date month temp province stn_joined element
unit
#> <chr> <date> <int> <dbl> <chr> <chr> <chr>
<chr>
#> 1 ALBERNI 1992-01-01 1 6.5 BC station joined Homogenized d…
Deg C…
#> 2 ALBERNI 1992-01-02 1 5.5 BC station joined Homogenized d…
Deg C…
#> 3 ALBERNI 1992-01-03 1 3.5 BC station joined Homogenized d…
Deg C…
#> 4 ALBERNI 1992-01-04 1 6 BC station joined Homogenized d…
Deg C…
#> 5 ALBERNI 1992-01-05 1 0.5 BC station joined Homogenized d…
Deg C…
#> 6 ALBERNI 1992-01-06 1 0 BC station joined Homogenized d…
Deg C…
#> 7 ALBERNI 1992-01-07 1 0 BC station joined Homogenized d…
Deg C…
#> 8 ALBERNI 1992-01-08 1 1.5 BC station joined Homogenized d…
Deg C…
#> 9 ALBERNI 1992-01-09 1 4 BC station joined Homogenized d…
Deg C…
#> 10 ALBERNI 1992-01-10 1 5.5 BC station joined Homogenized d…
Deg C…
#> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
#> # flag <chr>, stn_id <chr>, year <int>, measure <chr>
toc()
#> query already openend dataset: 0.356 sec elapsed
{code}
The above reprex is self-contained, but will take a while to run, specifically
the writing of the parquet dataset can take up to 30 min. It also downloads a
380MB zip file of csvs from my Dropbox.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)