[ 
https://issues.apache.org/jira/browse/ARROW-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458583#comment-17458583
 ] 

Weston Pace commented on ARROW-15069:
-------------------------------------

I'm going to try and take a look at this soon.  I have a hunch the problem is 
that your 780 CSV files are turning into 191,171 parquet files.  Luckily, we 
have done some work recently that should help with this by batching files up in 
memory to ensure minimum row group sizes when doing a dataset write.

> [R] open_dataset very slow on heavily partitioned parquet dataset
> -----------------------------------------------------------------
>
>                 Key: ARROW-15069
>                 URL: https://issues.apache.org/jira/browse/ARROW-15069
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: R
>    Affects Versions: 6.0.1
>         Environment: macOS Mojave, R 4.1.1 
>            Reporter: Andy Teucher
>            Assignee: Weston Pace
>            Priority: Minor
>
> Opening a (particular) partitioned hive-style parquet dataset is very slow 
> (45s to 1 minute).  I have a reproducible example below that takes 780 csv 
> files and writes them to parquet using the {{open_dataset("csv files") |> 
> group_by(vars) |> write_dataset("parquet")}} suggested 
> [here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets].
>  Opening and querying the subsequent parquet dataset is much slower than 
> doing it on the original csv files, which is not what I expected.
> {code:java}
> library(arrow)
> library(dplyr)
> library(tictoc)
> zipfile <- "ahccd.zip"
> csv_dir <- "data/csv"
> parquet_dir <- "data/parquet"
> dir.create(csv_dir, recursive = TRUE)
> dir.create(parquet_dir, recursive = TRUE)
> # A zip of 780 csvs of daily temperature data at Canadian climate stations 
> (one file per station)
> download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1";, 
> destfile = zipfile)
> unzip(zipfile, exdir = csv_dir)
> csv_schema <- schema(
>   field("stn_id", string()),
>   field("stn_name", string()),
>   field("measure", string()),
>   field("date", date32()),
>   field("year", int64()),
>   field("month", int64()),
>   field("temp", double()),
>   field("province", string()),
>   field("stn_joined", string()),
>   field("element", string()),
>   field("unit", string()),
>   field("stn_last_updated", string()),
>   field("flag", string())
> )
> csv_format <- FileFormat$create(format = "csv", quoting = FALSE)
> # Write to parquet, partitioning on stn_id, year, measure
> tic("write csv to parquet")
> arrow::open_dataset("data/csv", schema = csv_schema,
>                     format = csv_format) |>
>   group_by(stn_id, year, measure) |>
>   write_dataset(parquet_dir, format = "parquet")
> toc()
> #> write csv to parquet: 2067.093 sec elapsed
> stations <- c("1100031", "1100120", "1100119", "1036B06")
> ## Query directory of original csv files
> tic("query csv")
> foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
>                            format = csv_format) |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query csv: 12.571 sec elapsed
> ## Query the hive-style parquet directory
> tic("query parquet")
> bar <- arrow::open_dataset("data/parquet") |>
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> toc()
> #> query parquet: 41.79 sec elapsed
> ## It turns out that it is just the opening of the dataset 
> ## that takes so long
> tic("open parquet dataset")
> ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
> toc()
> #> open parquet dataset: 45.581 sec elapsed
> ds
> #> FileSystemDataset with 191171 Parquet files
> #> stn_name: string
> #> date: date32[day]
> #> month: int64
> #> temp: double
> #> province: string
> #> stn_joined: string
> #> element: string
> #> unit: string
> #> stn_last_updated: string
> #> flag: string
> #> stn_id: string
> #> year: int32
> #> measure: string
> tic("query already openend dataset")
> ds |> 
>   filter(year >= 1990,
>          year <= 2020,
>          stn_id %in% stations,
>          measure == "daily_max") |>
>   collect()
> #> # A tibble: 44,469 × 13
> #>    stn_name date       month  temp province stn_joined     element        
> unit  
> #>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          
> <chr> 
> #>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… 
> Deg C…
> #>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… 
> Deg C…
> #>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… 
> Deg C…
> #>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… 
> Deg C…
> #>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… 
> Deg C…
> #>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… 
> Deg C…
> #>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… 
> Deg C…
> #>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… 
> Deg C…
> #>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… 
> Deg C…
> #> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… 
> Deg C…
> #> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
> #> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>
> toc()
> #> query already openend dataset: 0.356 sec elapsed
> {code}
> The above reprex is self-contained, but will take a while to run, 
> specifically the writing of the parquet dataset can take up to 30 min. It 
> also downloads a 380MB zip file of csvs from my Dropbox.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to