[
https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458583#comment-17458583
]
Weston Pace commented on ARROW-15069:
-------------------------------------
I'm going to try and take a look at this soon. I have a hunch the problem is
that your 780 CSV files are turning into 191,171 parquet files. Luckily, we
have done some work recently that should help with this by batching files up in
memory to ensure minimum row group sizes when doing a dataset write.
> [R] open_dataset very slow on heavily partitioned parquet dataset
> -----------------------------------------------------------------
>
> Key: ARROW-15069
> URL: https://issues.apache.org/jira/browse/ARROW-15069
> Project: Apache Arrow
> Issue Type: Bug
> Components: R
> Affects Versions: 6.0.1
> Environment: macOS Mojave, R 4.1.1
> Reporter: Andy Teucher
> Assignee: Weston Pace
> Priority: Minor
>
> Opening a (particular) partitioned hive-style parquet dataset is very slow
> (45s to 1 minute). I have a reproducible example below that takes 780 csv
> files and writes them to parquet using the {{open_dataset("csv files") |>
> group_by(vars) |> write_dataset("parquet")}} suggested
> [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets].
> Opening and querying the subsequent parquet dataset is much slower than
> doing it on the original csv files, which is not what I expected.
> {code:java}
> library(arrow)
> library(dplyr)
> library(tictoc)
> zipfile <- "ahccd.zip"
> csv_dir <- "data/csv"
> parquet_dir <- "data/parquet"
> dir.create(csv_dir, recursive = TRUE)
> dir.create(parquet_dir, recursive = TRUE)
> # A zip of 780 csvs of daily temperature data at Canadian climate stations
> (one file per station)
> download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1",
> destfile = zipfile)
> unzip(zipfile, exdir = csv_dir)
> csv_schema <- schema(
> field("stn_id", string()),
> field("stn_name", string()),
> field("measure", string()),
> field("date", date32()),
> field("year", int64()),
> field("month", int64()),
> field("temp", double()),
> field("province", string()),
> field("stn_joined", string()),
> field("element", string()),
> field("unit", string()),
> field("stn_last_updated", string()),
> field("flag", string())
> )
> csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
> # Write to parquet, partitioning on stn_id, year, measure
> tic("write csv to parquet")
> arrow::open_dataset("data/csv", schema = csv_schema,
> format = csv_format) |>
> group_by(stn_id, year, measure) |>
> write_dataset(parquet_dir, format = "parquet")
> toc()
> #> write csv to parquet: 2067.093 sec elapsed
> stations <- c("1100031", "1100120", "1100119", "1036B06")
> ## Query directory of original csv files
> tic("query csv")
> foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
> format = csv_format) |>
> filter(year >= 1990,
> year <= 2020,
> stn_id %in% stations,
> measure == "daily_max") |>
> collect()
> toc()
> #> query csv: 12.571 sec elapsed
> ## Query the hive-style parquet directory
> tic("query parquet")
> bar <- arrow::open_dataset("data/parquet") |>
> filter(year >= 1990,
> year <= 2020,
> stn_id %in% stations,
> measure == "daily_max") |>
> collect()
> toc()
> #> query parquet: 41.79 sec elapsed
> ## It turns out that it is just the opening of the dataset
> ## that takes so long
> tic("open parquet dataset")
> ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
> toc()
> #> open parquet dataset: 45.581 sec elapsed
> ds
> #> FileSystemDataset with 191171 Parquet files
> #> stn_name: string
> #> date: date32[day]
> #> month: int64
> #> temp: double
> #> province: string
> #> stn_joined: string
> #> element: string
> #> unit: string
> #> stn_last_updated: string
> #> flag: string
> #> stn_id: string
> #> year: int32
> #> measure: string
> tic("query already openend dataset")
> ds |>
> filter(year >= 1990,
> year <= 2020,
> stn_id %in% stations,
> measure == "daily_max") |>
> collect()
> #> # A tibble: 44,469 × 13
> #> stn_name date month temp province stn_joined element
> unit
> #> <chr> <date> <int> <dbl> <chr> <chr> <chr>
> <chr>
> #> 1 ALBERNI 1992-01-01 1 6.5 BC station joined Homogenized d…
> Deg C…
> #> 2 ALBERNI 1992-01-02 1 5.5 BC station joined Homogenized d…
> Deg C…
> #> 3 ALBERNI 1992-01-03 1 3.5 BC station joined Homogenized d…
> Deg C…
> #> 4 ALBERNI 1992-01-04 1 6 BC station joined Homogenized d…
> Deg C…
> #> 5 ALBERNI 1992-01-05 1 0.5 BC station joined Homogenized d…
> Deg C…
> #> 6 ALBERNI 1992-01-06 1 0 BC station joined Homogenized d…
> Deg C…
> #> 7 ALBERNI 1992-01-07 1 0 BC station joined Homogenized d…
> Deg C…
> #> 8 ALBERNI 1992-01-08 1 1.5 BC station joined Homogenized d…
> Deg C…
> #> 9 ALBERNI 1992-01-09 1 4 BC station joined Homogenized d…
> Deg C…
> #> 10 ALBERNI 1992-01-10 1 5.5 BC station joined Homogenized d…
> Deg C…
> #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
> #> # flag <chr>, stn_id <chr>, year <int>, measure <chr>
> toc()
> #> query already openend dataset: 0.356 sec elapsed
> {code}
> The above reprex is self-contained, but will take a while to run,
> specifically the writing of the parquet dataset can take up to 30 min. It
> also downloads a 380MB zip file of csvs from my Dropbox.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)