annakrystalli commented on issue #34640:
URL: https://github.com/apache/arrow/issues/34640#issuecomment-2577363842

   Hello @thisisnic 
   
   I'm reposting here because I don't believe this is issue has been completely 
resolved, specifically for hive partitioned hubs.
   
   Specifically, with the approach described above which was deemed to be 
working for hive partitioned hubs, still does not include the partitioning 
column in the resulting dataset when a schema is applied
   
   ``` r
   library(arrow)
   library(dplyr)
   
   
   tf <- tempfile()
   dir.create(tf)
   
   arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv")
   
   schema_with_am <- schema(
     mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
     drat = float64(), wt = float64(), qsec = float64(), vs = int64(),
     am = int64(), gear = int64(), carb = int64()
   )
   
   schema_without_am <- schema(
     mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
     drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear = 
int64(), carb = int64()
   )
   
   
   # CSV is NOT fine with the schema without the partitioning value
   csv <- open_dataset(
     tf,
     format = "csv",
     schema = schema_without_am,
     skip = 1,
     partitioning = "am"
   )
   
   # The am column is still missing
   csv |> collect()
   #> # A tibble: 32 × 10
   #>      mpg   cyl  disp    hp  drat    wt  qsec    vs  gear  carb
   #>    <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int>
   #>  1  21.4     6  258    110  3.08  3.22  19.4     1     3     1
   #>  2  18.7     8  360    175  3.15  3.44  17.0     0     3     2
   #>  3  18.1     6  225    105  2.76  3.46  20.2     1     3     1
   #>  4  14.3     8  360    245  3.21  3.57  15.8     0     3     4
   #>  5  24.4     4  147.    62  3.69  3.19  20       1     4     2
   #>  6  22.8     4  141.    95  3.92  3.15  22.9     1     4     2
   #>  7  19.2     6  168.   123  3.92  3.44  18.3     1     4     4
   #>  8  17.8     6  168.   123  3.92  3.44  18.9     1     4     4
   #>  9  16.4     8  276.   180  3.07  4.07  17.4     0     3     3
   #> 10  17.3     8  276.   180  3.07  3.73  17.6     0     3     3
   #> # ℹ 22 more rows
   
   setdiff(names(mtcars), csv |> collect() |> names())
   #> [1] "am"
   ```
   
   <sup>Created on 2025-01-08 with [reprex 
v2.1.0](https://reprex.tidyverse.org)</sup>
   
   The only way I have found to include is to use the `col_types` argument 
instead of schema as well as providean  explicit data type for the partition 
which, because we are using hive partitioning also requires the use of 
`hive_partition()` (which is not required when using non hive partitioned data.
   
   ``` r
   library(arrow, warn.conflicts = FALSE)
   library(dplyr, warn.conflicts = FALSE)
   
   tf <- tempfile()
   dir.create(tf)
   
   arrow::write_dataset(mtcars, tf, partitioning = "am", format = "csv")
   
   schema_with_am <- schema(
     mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
     drat = float64(), wt = float64(), qsec = float64(), vs = int64(),
     am = int64(), gear = int64(), carb = int64()
   )
   
   schema_without_am <- schema(
     mpg = float64(), cyl = int64(), disp = float64(), hp = int64(),
     drat = float64(), wt = float64(), qsec = float64(), vs = int64(), gear = 
int64(), carb = int64()
   )
   # Create partitioning object
   # As we are using hive partitioning, we need to use `hive_partitioning()` to
   # specify it
   partitioning <- arrow::hive_partition(
     am = int64()
   )
   
   csv <- arrow::open_dataset(tf,
     format = "csv",
     col_types = schema_without_am,
     quoted_na = TRUE,
     partitioning = partitioning
   ) 
   
   csv
   #> FileSystemDataset with 2 csv files
   #> 11 columns
   #> mpg: double
   #> cyl: int64
   #> disp: double
   #> hp: int64
   #> drat: double
   #> wt: double
   #> qsec: double
   #> vs: int64
   #> gear: int64
   #> carb: int64
   #> am: int64
   
   csv |> dplyr::collect()
   #> # A tibble: 32 × 11
   #>      mpg   cyl  disp    hp  drat    wt  qsec    vs  gear  carb    am
   #>    <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>
   #>  1  21.4     6  258    110  3.08  3.22  19.4     1     3     1     0
   #>  2  18.7     8  360    175  3.15  3.44  17.0     0     3     2     0
   #>  3  18.1     6  225    105  2.76  3.46  20.2     1     3     1     0
   #>  4  14.3     8  360    245  3.21  3.57  15.8     0     3     4     0
   #>  5  24.4     4  147.    62  3.69  3.19  20       1     4     2     0
   #>  6  22.8     4  141.    95  3.92  3.15  22.9     1     4     2     0
   #>  7  19.2     6  168.   123  3.92  3.44  18.3     1     4     4     0
   #>  8  17.8     6  168.   123  3.92  3.44  18.9     1     4     4     0
   #>  9  16.4     8  276.   180  3.07  4.07  17.4     0     3     3     0
   #> 10  17.3     8  276.   180  3.07  3.73  17.6     0     3     3     0
   #> # ℹ 22 more rows
   
   setdiff(names(mtcars), csv |> collect() |> names())
   #> character(0)
   ```
   
   <sup>Created on 2025-01-08 with [reprex 
v2.1.0](https://reprex.tidyverse.org)</sup>
   
   To demonstrate that the explicit partitioning data type is required and to 
provide a motivating example for what we are trying to achieve to correctly 
apply datatypes on partition columns I show an example with a data set we are 
looking to be able to flexibly read once partitioned regardless of what the 
partitioning structure is and regardless of whether it is partitioned to csv or 
parquet format
   
   
   ``` r
   library(arrow, warn.conflicts = FALSE)
   library(dplyr, warn.conflicts = FALSE)
   
   tf_csv <- tempfile()
   dir.create(tf_csv)
   tf_parquet <- tempfile()
   dir.create(tf_parquet)
   
   oracle <- 
readr::read_csv("https://raw.githubusercontent.com/hubverse-org/example-complex-forecast-hub/refs/heads/main/target-data/oracle-output.csv";)
   #> Rows: 200340 Columns: 6
   #> ── Column specification 
────────────────────────────────────────────────────────
   #> Delimiter: ","
   #> chr  (4): location, target, output_type, output_type_id
   #> dbl  (1): oracle_value
   #> date (1): target_end_date
   #> 
   #> ℹ Use `spec()` to retrieve the full column specification for this data.
   #> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this 
message.
   
   write_dataset(oracle, tf_csv, partitioning = "target_end_date", format = 
"csv")
   write_dataset(oracle, tf_parquet, partitioning = "target_end_date", format = 
"parquet")
   
   schema_without_ted <- schema(
       target = string(), location = string(),  output_type = string(), 
output_type_id = string(), oracle_value = arrow::float64())
   schema_with_ted <- schema(
       target = string(), location = string(),  output_type = string(), 
output_type_id = string(), oracle_value = arrow::float64(), target_end_date = 
date32())
   
   
   # As we are using hive partitioning, we need to use `hive_partitioning()` to
   # specify it
   partitioning <- arrow::hive_partition(
     target_end_date = date32()
   )
   
   # Using `schema` does not work as described above. 
   arrow::open_dataset(tf_csv,
     format = "csv",
     schema = schema_without_ted,
     quoted_na = TRUE,
     partitioning = partitioning,
     skip = 1L
   ) |>
       collect()
   #> # A tibble: 200,340 × 5
   #>    target location        output_type output_type_id oracle_value
   #>    <chr>  <chr>           <chr>       <chr>                 <dbl>
   #>  1 US     wk inc flu hosp quantile    <NA>                   2380
   #>  2 01     wk inc flu hosp quantile    <NA>                    141
   #>  3 02     wk inc flu hosp quantile    <NA>                      3
   #>  4 04     wk inc flu hosp quantile    <NA>                     22
   #>  5 05     wk inc flu hosp quantile    <NA>                     50
   #>  6 06     wk inc flu hosp quantile    <NA>                    124
   #>  7 08     wk inc flu hosp quantile    <NA>                     15
   #>  8 09     wk inc flu hosp quantile    <NA>                      9
   #>  9 10     wk inc flu hosp quantile    <NA>                      1
   #> 10 11     wk inc flu hosp quantile    <NA>                      8
   #> # ℹ 200,330 more rows
   
   # The only way to successfully apply the schema to the partitioning column 
is to 
   # use col_types as well as explicit partitioning data type specification. As
   # we are using hive partitioning, this also needs to use 
`hive_partitioning()` to
   # specify it
   arrow::open_dataset(tf_csv,
     format = "csv",
     col_types = schema_without_ted,
     quoted_na = TRUE,
     partitioning = partitioning
   ) |>
       collect()
   #> # A tibble: 200,340 × 6
   #>    location target       output_type output_type_id oracle_value 
target_end_date
   #>    <chr>    <chr>        <chr>       <chr>                 <dbl> <date>   
      
   #>  1 US       wk inc flu … quantile    <NA>                   2380 
2022-10-22     
   #>  2 01       wk inc flu … quantile    <NA>                    141 
2022-10-22     
   #>  3 02       wk inc flu … quantile    <NA>                      3 
2022-10-22     
   #>  4 04       wk inc flu … quantile    <NA>                     22 
2022-10-22     
   #>  5 05       wk inc flu … quantile    <NA>                     50 
2022-10-22     
   #>  6 06       wk inc flu … quantile    <NA>                    124 
2022-10-22     
   #>  7 08       wk inc flu … quantile    <NA>                     15 
2022-10-22     
   #>  8 09       wk inc flu … quantile    <NA>                      9 
2022-10-22     
   #>  9 10       wk inc flu … quantile    <NA>                      1 
2022-10-22     
   #> 10 11       wk inc flu … quantile    <NA>                      8 
2022-10-22     
   #> # ℹ 200,330 more rows
   
   # If we don't specify a data type, partitioning column is incorrectly 
returned as
   # character
   arrow::open_dataset(tf_csv,
     format = "csv",
     col_types = schema_without_ted,
     quoted_na = TRUE,
     partitioning = "target_end_date"
   ) |>
       collect()
   #> # A tibble: 200,340 × 6
   #>    location target       output_type output_type_id oracle_value 
target_end_date
   #>    <chr>    <chr>        <chr>       <chr>                 <dbl> <chr>    
      
   #>  1 US       wk inc flu … quantile    <NA>                   2380 
2022-10-22     
   #>  2 01       wk inc flu … quantile    <NA>                    141 
2022-10-22     
   #>  3 02       wk inc flu … quantile    <NA>                      3 
2022-10-22     
   #>  4 04       wk inc flu … quantile    <NA>                     22 
2022-10-22     
   #>  5 05       wk inc flu … quantile    <NA>                     50 
2022-10-22     
   #>  6 06       wk inc flu … quantile    <NA>                    124 
2022-10-22     
   #>  7 08       wk inc flu … quantile    <NA>                     15 
2022-10-22     
   #>  8 09       wk inc flu … quantile    <NA>                      9 
2022-10-22     
   #>  9 10       wk inc flu … quantile    <NA>                      1 
2022-10-22     
   #> 10 11       wk inc flu … quantile    <NA>                      8 
2022-10-22     
   #> # ℹ 200,330 more rows
   
   
   # Parquet datasets much smoother API as it doesn't require explicit 
partitioning
   # specification so long as the column exists in the schema. It also removes 
the requirement for differential partition schema partition when working with 
hive vs non-hive partitioning
   arrow::open_dataset(tf_parquet,
     format = "parquet",
     schema = schema_with_ted
   ) |>
       collect()
   #> # A tibble: 200,340 × 6
   #>    target       location output_type output_type_id oracle_value 
target_end_date
   #>    <chr>        <chr>    <chr>       <chr>                 <dbl> <date>   
      
   #>  1 wk inc flu … US       quantile    <NA>                   2380 
2022-10-22     
   #>  2 wk inc flu … 01       quantile    <NA>                    141 
2022-10-22     
   #>  3 wk inc flu … 02       quantile    <NA>                      3 
2022-10-22     
   #>  4 wk inc flu … 04       quantile    <NA>                     22 
2022-10-22     
   #>  5 wk inc flu … 05       quantile    <NA>                     50 
2022-10-22     
   #>  6 wk inc flu … 06       quantile    <NA>                    124 
2022-10-22     
   #>  7 wk inc flu … 08       quantile    <NA>                     15 
2022-10-22     
   #>  8 wk inc flu … 09       quantile    <NA>                      9 
2022-10-22     
   #>  9 wk inc flu … 10       quantile    <NA>                      1 
2022-10-22     
   #> 10 wk inc flu … 11       quantile    <NA>                      8 
2022-10-22     
   #> # ℹ 200,330 more rows
   ```
   
   <sup>Created on 2025-01-08 with [reprex 
v2.1.0](https://reprex.tidyverse.org)</sup>
   
   As demonstrated, schema specification on partitioned parquet is much 
smoother and general regardless of partitioning columns and style 
(hive/non-hive), as long as the schema provided matches the dataset columns.
   
   The ideal would be a similar API  for csvs where:
   - we can use `schema` instead of `col_types` 
   - `skip = 1` is not required  (which is not required for parquet seems 
awkward)
   - not require specific `partitioning` specification if partitioning columns 
are already in the schema supplied
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to