annakrystalli commented on issue #34640:
URL: https://github.com/apache/arrow/issues/34640#issuecomment-2577363842
Hello @thisisnic
I'm reposting here because I don't believe this is issue has been completely
resolved, specifically for hive partitioned hubs.
Specifically, with the approach described above which was deemed to be
working for hive partitioned hubs, still does not include the partitioning
column in the resulting dataset when a schema is applied
``` r
library(arrow)
library(dplyr)
tf <- tempfile()
dir.create(tf)
arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv")
schema_with_am <- schema(
mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
drat = float64(), wt = float64(), qsec = float64(), vs = int64(),
am = int64(), gear = int64(), carb = int64()
)
schema_without_am <- schema(
mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear =
int64(), carb = int64()
)
# CSV is NOT fine with the schema without the partitioning value
csv <- open_dataset(
tf,
format = "csv",
schema = schema_without_am,
skip = 1,
partitioning = "am"
)
# The am column is still missing
csv |> collect()
#> # A tibble: 32 × 10
#> mpg cyl disp hp drat wt qsec vs gear carb
#> <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int>
#> 1 21.4 6 258 110 3.08 3.22 19.4 1 3 1
#> 2 18.7 8 360 175 3.15 3.44 17.0 0 3 2
#> 3 18.1 6 225 105 2.76 3.46 20.2 1 3 1
#> 4 14.3 8 360 245 3.21 3.57 15.8 0 3 4
#> 5 24.4 4 147. 62 3.69 3.19 20 1 4 2
#> 6 22.8 4 141. 95 3.92 3.15 22.9 1 4 2
#> 7 19.2 6 168. 123 3.92 3.44 18.3 1 4 4
#> 8 17.8 6 168. 123 3.92 3.44 18.9 1 4 4
#> 9 16.4 8 276. 180 3.07 4.07 17.4 0 3 3
#> 10 17.3 8 276. 180 3.07 3.73 17.6 0 3 3
#> # ℹ 22 more rows
setdiff(names(mtcars), csv |> collect() |> names())
#> [1] "am"
```
<sup>Created on 2025-01-08 with [reprex
v2.1.0](https://reprex.tidyverse.org)</sup>
The only way I have found to include is to use the `col_types` argument
instead of schema as well as providean explicit data type for the partition
which, because we are using hive partitioning also requires the use of
`hive_partition()` (which is not required when using non hive partitioned data.
``` r
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
tf <- tempfile()
dir.create(tf)
arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv")
schema_with_am <- schema(
mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
drat = float64(), wt = float64(), qsec = float64(), vs = int64(),
am = int64(), gear = int64(), carb = int64()
)
schema_without_am <- schema(
mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear =
int64(), carb = int64()
)
# Create partitioning object
# As we are using hive partitioning, we need to use `hive_partitioning()` to
# specify it
partitioning <- arrow::hive_partition(
am = int64()
)
csv <- arrow::open_dataset(tf,
format = "csv",
col_types = schema_without_am,
quoted_na = TRUE,
partitioning = partitioning
)
csv
#> FileSystemDataset with 2 csv files
#> 11 columns
#> mpg: double
#> cyl: int64
#> disp: double
#> hp: int64
#> drat: double
#> wt: double
#> qsec: double
#> vs: int64
#> gear: int64
#> carb: int64
#> am: int64
csv |> dplyr::collect()
#> # A tibble: 32 × 11
#> mpg cyl disp hp drat wt qsec vs gear carb am
#> <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>
#> 1 21.4 6 258 110 3.08 3.22 19.4 1 3 1 0
#> 2 18.7 8 360 175 3.15 3.44 17.0 0 3 2 0
#> 3 18.1 6 225 105 2.76 3.46 20.2 1 3 1 0
#> 4 14.3 8 360 245 3.21 3.57 15.8 0 3 4 0
#> 5 24.4 4 147. 62 3.69 3.19 20 1 4 2 0
#> 6 22.8 4 141. 95 3.92 3.15 22.9 1 4 2 0
#> 7 19.2 6 168. 123 3.92 3.44 18.3 1 4 4 0
#> 8 17.8 6 168. 123 3.92 3.44 18.9 1 4 4 0
#> 9 16.4 8 276. 180 3.07 4.07 17.4 0 3 3 0
#> 10 17.3 8 276. 180 3.07 3.73 17.6 0 3 3 0
#> # ℹ 22 more rows
setdiff(names(mtcars), csv |> collect() |> names())
#> character(0)
```
<sup>Created on 2025-01-08 with [reprex
v2.1.0](https://reprex.tidyverse.org)</sup>
To demonstrate that the explicit partitioning data type is required and to
provide a motivating example for what we are trying to achieve to correctly
apply datatypes on partition columns I show an example with a data set we are
looking to be able to flexibly read once partitioned regardless of what the
partitioning structure is and regardless of whether it is partitioned to csv or
parquet format
``` r
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
tf_csv <- tempfile()
dir.create(tf_csv)
tf_parquet <- tempfile()
dir.create(tf_parquet)
oracle <-
readr::read_csv("https://raw.githubusercontent.com/hubverse-org/example-complex-forecast-hub/refs/heads/main/target-data/oracle-output.csv")
#> Rows: 200340 Columns: 6
#> ── Column specification
────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr (4): location, target, output_type, output_type_id
#> dbl (1): oracle_value
#> date (1): target_end_date
#>
#> ℹ Use `spec()` to retrieve the full column specification for this data.
#> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this
message.
write_dataset(oracle, tf_csv, partitioning = "target_end_date", format =
"csv")
write_dataset(oracle, tf_parquet, partitioning = "target_end_date", format =
"parquet")
schema_without_ted <- schema(
target = string(), location = string(), output_type = string(),
output_type_id = string(), oracle_value = arrow::float64())
schema_with_ted <- schema(
target = string(), location = string(), output_type = string(),
output_type_id = string(), oracle_value = arrow::float64(), target_end_date =
date32())
# As we are using hive partitioning, we need to use `hive_partitioning()` to
# specify it
partitioning <- arrow::hive_partition(
target_end_date = date32()
)
# Using `schema` does not work as described above.
arrow::open_dataset(tf_csv,
format = "csv",
schema = schema_without_ted,
quoted_na = TRUE,
partitioning = partitioning,
skip = 1L
) |>
collect()
#> # A tibble: 200,340 × 5
#> target location output_type output_type_id oracle_value
#> <chr> <chr> <chr> <chr> <dbl>
#> 1 US wk inc flu hosp quantile <NA> 2380
#> 2 01 wk inc flu hosp quantile <NA> 141
#> 3 02 wk inc flu hosp quantile <NA> 3
#> 4 04 wk inc flu hosp quantile <NA> 22
#> 5 05 wk inc flu hosp quantile <NA> 50
#> 6 06 wk inc flu hosp quantile <NA> 124
#> 7 08 wk inc flu hosp quantile <NA> 15
#> 8 09 wk inc flu hosp quantile <NA> 9
#> 9 10 wk inc flu hosp quantile <NA> 1
#> 10 11 wk inc flu hosp quantile <NA> 8
#> # ℹ 200,330 more rows
# The only way to successfully apply the schema to the partitioning column
is to
# use col_types as well as explicit partitioning data type specification. As
# we are using hive partitioning, this also needs to use
`hive_partitioning()` to
# specify it
arrow::open_dataset(tf_csv,
format = "csv",
col_types = schema_without_ted,
quoted_na = TRUE,
partitioning = partitioning
) |>
collect()
#> # A tibble: 200,340 × 6
#> location target output_type output_type_id oracle_value
target_end_date
#> <chr> <chr> <chr> <chr> <dbl> <date>
#> 1 US wk inc flu … quantile <NA> 2380
2022-10-22
#> 2 01 wk inc flu … quantile <NA> 141
2022-10-22
#> 3 02 wk inc flu … quantile <NA> 3
2022-10-22
#> 4 04 wk inc flu … quantile <NA> 22
2022-10-22
#> 5 05 wk inc flu … quantile <NA> 50
2022-10-22
#> 6 06 wk inc flu … quantile <NA> 124
2022-10-22
#> 7 08 wk inc flu … quantile <NA> 15
2022-10-22
#> 8 09 wk inc flu … quantile <NA> 9
2022-10-22
#> 9 10 wk inc flu … quantile <NA> 1
2022-10-22
#> 10 11 wk inc flu … quantile <NA> 8
2022-10-22
#> # ℹ 200,330 more rows
# If we don't specify a data type, partitioning column is incorrectly
returned as
# character
arrow::open_dataset(tf_csv,
format = "csv",
col_types = schema_without_ted,
quoted_na = TRUE,
partitioning = "target_end_date"
) |>
collect()
#> # A tibble: 200,340 × 6
#> location target output_type output_type_id oracle_value
target_end_date
#> <chr> <chr> <chr> <chr> <dbl> <chr>
#> 1 US wk inc flu … quantile <NA> 2380
2022-10-22
#> 2 01 wk inc flu … quantile <NA> 141
2022-10-22
#> 3 02 wk inc flu … quantile <NA> 3
2022-10-22
#> 4 04 wk inc flu … quantile <NA> 22
2022-10-22
#> 5 05 wk inc flu … quantile <NA> 50
2022-10-22
#> 6 06 wk inc flu … quantile <NA> 124
2022-10-22
#> 7 08 wk inc flu … quantile <NA> 15
2022-10-22
#> 8 09 wk inc flu … quantile <NA> 9
2022-10-22
#> 9 10 wk inc flu … quantile <NA> 1
2022-10-22
#> 10 11 wk inc flu … quantile <NA> 8
2022-10-22
#> # ℹ 200,330 more rows
# Parquet datasets much smoother API as it doesn't require explicit
partitioning
# specification so long as the column exists in the schema. It also removes
the requirement for differential partition schema partition when working with
hive vs non-hive partitioning
arrow::open_dataset(tf_parquet,
format = "parquet",
schema = schema_with_ted
) |>
collect()
#> # A tibble: 200,340 × 6
#> target location output_type output_type_id oracle_value
target_end_date
#> <chr> <chr> <chr> <chr> <dbl> <date>
#> 1 wk inc flu … US quantile <NA> 2380
2022-10-22
#> 2 wk inc flu … 01 quantile <NA> 141
2022-10-22
#> 3 wk inc flu … 02 quantile <NA> 3
2022-10-22
#> 4 wk inc flu … 04 quantile <NA> 22
2022-10-22
#> 5 wk inc flu … 05 quantile <NA> 50
2022-10-22
#> 6 wk inc flu … 06 quantile <NA> 124
2022-10-22
#> 7 wk inc flu … 08 quantile <NA> 15
2022-10-22
#> 8 wk inc flu … 09 quantile <NA> 9
2022-10-22
#> 9 wk inc flu … 10 quantile <NA> 1
2022-10-22
#> 10 wk inc flu … 11 quantile <NA> 8
2022-10-22
#> # ℹ 200,330 more rows
```
<sup>Created on 2025-01-08 with [reprex
v2.1.0](https://reprex.tidyverse.org)</sup>
As demonstrated, schema specification on partitioned parquet is much
smoother and general regardless of partitioning columns and style
(hive/non-hive), as long as the schema provided matches the dataset columns.
The ideal would be a similar API for csvs where:
- we can use `schema` instead of `col_types`
- `skip = 1` is not required (which is not required for parquet seems
awkward)
- not require specific `partitioning` specification if partitioning columns
are already in the schema supplied
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]