annakrystalli commented on issue #34640: URL: https://github.com/apache/arrow/issues/34640#issuecomment-2577363842
Hello @thisisnic I'm reposting here because I don't believe this is issue has been completely resolved, specifically for hive partitioned hubs. Specifically, with the approach described above which was deemed to be working for hive partitioned hubs, still does not include the partitioning column in the resulting dataset when a schema is applied ``` r library(arrow) library(dplyr) tf <- tempfile() dir.create(tf) arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv") schema_with_am <- schema( mpg = float64(), cyl = int64(), disp = float64(), hp = int64(), drat = float64(), wt = float64(), qsec = float64(), vs = int64(), am = int64(), gear = int64(), carb = int64() ) schema_without_am <- schema( mpg = float64(), cyl = int64(), disp = float64(), hp = int64(), drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear = int64(), carb = int64() ) # CSV is NOT fine with the schema without the partitioning value csv <- open_dataset( tf, format = "csv", schema = schema_without_am, skip = 1, partitioning = "am" ) # The am column is still missing csv |> collect() #> # A tibble: 32 × 10 #> mpg cyl disp hp drat wt qsec vs gear carb #> <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> #> 1 21.4 6 258 110 3.08 3.22 19.4 1 3 1 #> 2 18.7 8 360 175 3.15 3.44 17.0 0 3 2 #> 3 18.1 6 225 105 2.76 3.46 20.2 1 3 1 #> 4 14.3 8 360 245 3.21 3.57 15.8 0 3 4 #> 5 24.4 4 147. 62 3.69 3.19 20 1 4 2 #> 6 22.8 4 141. 95 3.92 3.15 22.9 1 4 2 #> 7 19.2 6 168. 123 3.92 3.44 18.3 1 4 4 #> 8 17.8 6 168. 123 3.92 3.44 18.9 1 4 4 #> 9 16.4 8 276. 180 3.07 4.07 17.4 0 3 3 #> 10 17.3 8 276. 180 3.07 3.73 17.6 0 3 3 #> # ℹ 22 more rows setdiff(names(mtcars), csv |> collect() |> names()) #> [1] "am" ``` <sup>Created on 2025-01-08 with [reprex v2.1.0](https://reprex.tidyverse.org)</sup> The only way I have found to include is to use the `col_types` argument instead of schema as well as providean explicit data type for the partition which, because we are using hive partitioning also requires the use of `hive_partition()` (which is not required when using non hive partitioned data. ``` r library(arrow, warn.conflicts = FALSE) library(dplyr, warn.conflicts = FALSE) tf <- tempfile() dir.create(tf) arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv") schema_with_am <- schema( mpg = float64(), cyl = int64(), disp = float64(), hp = int64(), drat = float64(), wt = float64(), qsec = float64(), vs = int64(), am = int64(), gear = int64(), carb = int64() ) schema_without_am <- schema( mpg = float64(), cyl = int64(), disp = float64(), hp = int64(), drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear = int64(), carb = int64() ) # Create partitioning object # As we are using hive partitioning, we need to use `hive_partitioning()` to # specify it partitioning <- arrow::hive_partition( am = int64() ) csv <- arrow::open_dataset(tf, format = "csv", col_types = schema_without_am, quoted_na = TRUE, partitioning = partitioning ) csv #> FileSystemDataset with 2 csv files #> 11 columns #> mpg: double #> cyl: int64 #> disp: double #> hp: int64 #> drat: double #> wt: double #> qsec: double #> vs: int64 #> gear: int64 #> carb: int64 #> am: int64 csv |> dplyr::collect() #> # A tibble: 32 × 11 #> mpg cyl disp hp drat wt qsec vs gear carb am #> <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int> #> 1 21.4 6 258 110 3.08 3.22 19.4 1 3 1 0 #> 2 18.7 8 360 175 3.15 3.44 17.0 0 3 2 0 #> 3 18.1 6 225 105 2.76 3.46 20.2 1 3 1 0 #> 4 14.3 8 360 245 3.21 3.57 15.8 0 3 4 0 #> 5 24.4 4 147. 62 3.69 3.19 20 1 4 2 0 #> 6 22.8 4 141. 95 3.92 3.15 22.9 1 4 2 0 #> 7 19.2 6 168. 123 3.92 3.44 18.3 1 4 4 0 #> 8 17.8 6 168. 123 3.92 3.44 18.9 1 4 4 0 #> 9 16.4 8 276. 180 3.07 4.07 17.4 0 3 3 0 #> 10 17.3 8 276. 180 3.07 3.73 17.6 0 3 3 0 #> # ℹ 22 more rows setdiff(names(mtcars), csv |> collect() |> names()) #> character(0) ``` <sup>Created on 2025-01-08 with [reprex v2.1.0](https://reprex.tidyverse.org)</sup> To demonstrate that the explicit partitioning data type is required and to provide a motivating example for what we are trying to achieve to correctly apply datatypes on partition columns I show an example with a data set we are looking to be able to flexibly read once partitioned regardless of what the partitioning structure is and regardless of whether it is partitioned to csv or parquet format ``` r library(arrow, warn.conflicts = FALSE) library(dplyr, warn.conflicts = FALSE) tf_csv <- tempfile() dir.create(tf_csv) tf_parquet <- tempfile() dir.create(tf_parquet) oracle <- readr::read_csv("https://raw.githubusercontent.com/hubverse-org/example-complex-forecast-hub/refs/heads/main/target-data/oracle-output.csv") #> Rows: 200340 Columns: 6 #> ── Column specification ──────────────────────────────────────────────────────── #> Delimiter: "," #> chr (4): location, target, output_type, output_type_id #> dbl (1): oracle_value #> date (1): target_end_date #> #> ℹ Use `spec()` to retrieve the full column specification for this data. #> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this message. write_dataset(oracle, tf_csv, partitioning = "target_end_date", format = "csv") write_dataset(oracle, tf_parquet, partitioning = "target_end_date", format = "parquet") schema_without_ted <- schema( target = string(), location = string(), output_type = string(), output_type_id = string(), oracle_value = arrow::float64()) schema_with_ted <- schema( target = string(), location = string(), output_type = string(), output_type_id = string(), oracle_value = arrow::float64(), target_end_date = date32()) # As we are using hive partitioning, we need to use `hive_partitioning()` to # specify it partitioning <- arrow::hive_partition( target_end_date = date32() ) # Using `schema` does not work as described above. arrow::open_dataset(tf_csv, format = "csv", schema = schema_without_ted, quoted_na = TRUE, partitioning = partitioning, skip = 1L ) |> collect() #> # A tibble: 200,340 × 5 #> target location output_type output_type_id oracle_value #> <chr> <chr> <chr> <chr> <dbl> #> 1 US wk inc flu hosp quantile <NA> 2380 #> 2 01 wk inc flu hosp quantile <NA> 141 #> 3 02 wk inc flu hosp quantile <NA> 3 #> 4 04 wk inc flu hosp quantile <NA> 22 #> 5 05 wk inc flu hosp quantile <NA> 50 #> 6 06 wk inc flu hosp quantile <NA> 124 #> 7 08 wk inc flu hosp quantile <NA> 15 #> 8 09 wk inc flu hosp quantile <NA> 9 #> 9 10 wk inc flu hosp quantile <NA> 1 #> 10 11 wk inc flu hosp quantile <NA> 8 #> # ℹ 200,330 more rows # The only way to successfully apply the schema to the partitioning column is to # use col_types as well as explicit partitioning data type specification. As # we are using hive partitioning, this also needs to use `hive_partitioning()` to # specify it arrow::open_dataset(tf_csv, format = "csv", col_types = schema_without_ted, quoted_na = TRUE, partitioning = partitioning ) |> collect() #> # A tibble: 200,340 × 6 #> location target output_type output_type_id oracle_value target_end_date #> <chr> <chr> <chr> <chr> <dbl> <date> #> 1 US wk inc flu … quantile <NA> 2380 2022-10-22 #> 2 01 wk inc flu … quantile <NA> 141 2022-10-22 #> 3 02 wk inc flu … quantile <NA> 3 2022-10-22 #> 4 04 wk inc flu … quantile <NA> 22 2022-10-22 #> 5 05 wk inc flu … quantile <NA> 50 2022-10-22 #> 6 06 wk inc flu … quantile <NA> 124 2022-10-22 #> 7 08 wk inc flu … quantile <NA> 15 2022-10-22 #> 8 09 wk inc flu … quantile <NA> 9 2022-10-22 #> 9 10 wk inc flu … quantile <NA> 1 2022-10-22 #> 10 11 wk inc flu … quantile <NA> 8 2022-10-22 #> # ℹ 200,330 more rows # If we don't specify a data type, partitioning column is incorrectly returned as # character arrow::open_dataset(tf_csv, format = "csv", col_types = schema_without_ted, quoted_na = TRUE, partitioning = "target_end_date" ) |> collect() #> # A tibble: 200,340 × 6 #> location target output_type output_type_id oracle_value target_end_date #> <chr> <chr> <chr> <chr> <dbl> <chr> #> 1 US wk inc flu … quantile <NA> 2380 2022-10-22 #> 2 01 wk inc flu … quantile <NA> 141 2022-10-22 #> 3 02 wk inc flu … quantile <NA> 3 2022-10-22 #> 4 04 wk inc flu … quantile <NA> 22 2022-10-22 #> 5 05 wk inc flu … quantile <NA> 50 2022-10-22 #> 6 06 wk inc flu … quantile <NA> 124 2022-10-22 #> 7 08 wk inc flu … quantile <NA> 15 2022-10-22 #> 8 09 wk inc flu … quantile <NA> 9 2022-10-22 #> 9 10 wk inc flu … quantile <NA> 1 2022-10-22 #> 10 11 wk inc flu … quantile <NA> 8 2022-10-22 #> # ℹ 200,330 more rows # Parquet datasets much smoother API as it doesn't require explicit partitioning # specification so long as the column exists in the schema. It also removes the requirement for differential partition schema partition when working with hive vs non-hive partitioning arrow::open_dataset(tf_parquet, format = "parquet", schema = schema_with_ted ) |> collect() #> # A tibble: 200,340 × 6 #> target location output_type output_type_id oracle_value target_end_date #> <chr> <chr> <chr> <chr> <dbl> <date> #> 1 wk inc flu … US quantile <NA> 2380 2022-10-22 #> 2 wk inc flu … 01 quantile <NA> 141 2022-10-22 #> 3 wk inc flu … 02 quantile <NA> 3 2022-10-22 #> 4 wk inc flu … 04 quantile <NA> 22 2022-10-22 #> 5 wk inc flu … 05 quantile <NA> 50 2022-10-22 #> 6 wk inc flu … 06 quantile <NA> 124 2022-10-22 #> 7 wk inc flu … 08 quantile <NA> 15 2022-10-22 #> 8 wk inc flu … 09 quantile <NA> 9 2022-10-22 #> 9 wk inc flu … 10 quantile <NA> 1 2022-10-22 #> 10 wk inc flu … 11 quantile <NA> 8 2022-10-22 #> # ℹ 200,330 more rows ``` <sup>Created on 2025-01-08 with [reprex v2.1.0](https://reprex.tidyverse.org)</sup> As demonstrated, schema specification on partitioned parquet is much smoother and general regardless of partitioning columns and style (hive/non-hive), as long as the schema provided matches the dataset columns. The ideal would be a similar API for csvs where: - we can use `schema` instead of `col_types` - `skip = 1` is not required (which is not required for parquet seems awkward) - not require specific `partitioning` specification if partitioning columns are already in the schema supplied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org