Andy Teucher created ARROW-15069:
------------------------------------

             Summary: [R] open_dataset very slow on heavily partitioned parquet 
dataset
                 Key: ARROW-15069
                 URL: https://issues.apache.org/jira/browse/ARROW-15069
             Project: Apache Arrow
          Issue Type: Bug
          Components: R
    Affects Versions: 6.0.1
         Environment: macOS Mojave, R 4.1.1 
            Reporter: Andy Teucher


Opening a (particular) partitioned hive-style parquet dataset is very slow (45s 
to 1 minute).  I have a reproducible example below that takes 780 csv files and 
writes them to parquet using the {{open_dataset("csv files") |> group_by(vars) 
|> write_dataset("parquet")}} suggested 
[here|https://arrow.apache.org/docs/r/articles/dataset.html#writing-datasets]. 
Opening and querying the subsequent parquet dataset is much slower than doing 
it on the original csv files, which is not what I expected.
{code:java}
library(arrow)
library(dplyr)
library(tictoc)

zipfile <- "ahccd.zip"
csv_dir <- "data/csv"
parquet_dir <- "data/parquet"

dir.create(csv_dir, recursive = TRUE)
dir.create(parquet_dir, recursive = TRUE)

# A zip of 780 csvs of daily temperature data at Canadian climate stations (one 
file per station)
download.file("https://www.dropbox.com/s/f0a18jp0lvbp1hp/ahccd.zip?dl=1";, 
destfile = zipfile)

unzip(zipfile, exdir = csv_dir)

csv_schema <- schema(
  field("stn_id", string()),
  field("stn_name", string()),
  field("measure", string()),
  field("date", date32()),
  field("year", int64()),
  field("month", int64()),
  field("temp", double()),
  field("province", string()),
  field("stn_joined", string()),
  field("element", string()),
  field("unit", string()),
  field("stn_last_updated", string()),
  field("flag", string())
)

csv_format <- FileFormat$create(format = "csv", quoting = FALSE)

# Write to parquet, partitioning on stn_id, year, measure
tic("write csv to parquet")
arrow::open_dataset("data/csv", schema = csv_schema,
                    format = csv_format) |>
  group_by(stn_id, year, measure) |>
  write_dataset(parquet_dir, format = "parquet")
toc()
#> write csv to parquet: 2067.093 sec elapsed

stations <- c("1100031", "1100120", "1100119", "1036B06")

## Query directory of original csv files
tic("query csv")
foo <- arrow::open_dataset(csv_dir, schema = csv_schema,
                           format = csv_format) |>
  filter(year >= 1990,
         year <= 2020,
         stn_id %in% stations,
         measure == "daily_max") |>
  collect()
toc()
#> query csv: 12.571 sec elapsed

## Query the hive-style parquet directory
tic("query parquet")
bar <- arrow::open_dataset("data/parquet") |>
  filter(year >= 1990,
         year <= 2020,
         stn_id %in% stations,
         measure == "daily_max") |>
  collect()
toc()
#> query parquet: 41.79 sec elapsed

## It turns out that it is just the opening of the dataset 
## that takes so long
tic("open parquet dataset")
ds <- arrow::open_dataset("~/Desktop/arrow-report/data/parquet")
toc()
#> open parquet dataset: 45.581 sec elapsed

ds
#> FileSystemDataset with 191171 Parquet files
#> stn_name: string
#> date: date32[day]
#> month: int64
#> temp: double
#> province: string
#> stn_joined: string
#> element: string
#> unit: string
#> stn_last_updated: string
#> flag: string
#> stn_id: string
#> year: int32
#> measure: string

tic("query already openend dataset")
ds |> 
  filter(year >= 1990,
         year <= 2020,
         stn_id %in% stations,
         measure == "daily_max") |>
  collect()
#> # A tibble: 44,469 × 13
#>    stn_name date       month  temp province stn_joined     element        
unit  
#>    <chr>    <date>     <int> <dbl> <chr>    <chr>          <chr>          
<chr> 
#>  1 ALBERNI  1992-01-01     1   6.5 BC       station joined Homogenized d… 
Deg C…
#>  2 ALBERNI  1992-01-02     1   5.5 BC       station joined Homogenized d… 
Deg C…
#>  3 ALBERNI  1992-01-03     1   3.5 BC       station joined Homogenized d… 
Deg C…
#>  4 ALBERNI  1992-01-04     1   6   BC       station joined Homogenized d… 
Deg C…
#>  5 ALBERNI  1992-01-05     1   0.5 BC       station joined Homogenized d… 
Deg C…
#>  6 ALBERNI  1992-01-06     1   0   BC       station joined Homogenized d… 
Deg C…
#>  7 ALBERNI  1992-01-07     1   0   BC       station joined Homogenized d… 
Deg C…
#>  8 ALBERNI  1992-01-08     1   1.5 BC       station joined Homogenized d… 
Deg C…
#>  9 ALBERNI  1992-01-09     1   4   BC       station joined Homogenized d… 
Deg C…
#> 10 ALBERNI  1992-01-10     1   5.5 BC       station joined Homogenized d… 
Deg C…
#> # … with 44,459 more rows, and 5 more variables: stn_last_updated <chr>,
#> #   flag <chr>, stn_id <chr>, year <int>, measure <chr>

toc()
#> query already openend dataset: 0.356 sec elapsed
{code}
The above reprex is self-contained, but will take a while to run, specifically 
the writing of the parquet dataset can take up to 30 min. It also downloads a 
380MB zip file of csvs from my Dropbox.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to