annakrystalli opened a new issue, #34589:
URL: https://github.com/apache/arrow/issues/34589

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   Hi,
   
   I originally asked this as a [question on 
stackoverflow](https://stackoverflow.com/questions/75652190/difficulty-with-unifying-schemas-when-trying-to-open-arrow-dataset-with-two-diff)
 but the more I thought about it, and given I got no answers, I feel it might a 
bug so thought I'd report it here too.
   
   I'm including the full context of what I'm trying to do but ultimately the 
problem seems to be that `open_dataset()` is not recognising/reading csv files 
when an explicit schema is provided.
   
   ### Full context 
   
   I'm trying to open a FileSystemDataset using `arrow::open_dataset()` from a 
directory that contains two different file formats (csv & parquet). The single 
parquet file also has an additional field (`age_group`). The approach needs to 
be generalisable as the field names as well as file formats might change 
between projects.
   
   My initial plan for dealing with more than one file format was to create a 
`FileSystemDataset` for each format and then open a single `UnionDataset` from 
all `FileSystemDataset`s.
   
   However, this approach errors because one of the fields (`horizon`) is 
parsed as `int64()` in the csv `FileSystemDataset` and `int32()` in the parquet 
`FileSystemDataset` which doesn't allow the schema to be unified.
   
   To get around this in a flexible and general way, I created a unified schema 
by keeping the schema from the first FileSystemDataset (csv) and adding any 
additional fields from other FileSystemDatasets. I then used that to create 
appropriate schema subsets for each FileSystemDataset. I tried to replace each 
dataset's schema through assignment but that threw the same initial error.
   
   Finally I tried to reopen the FileSystemDatasets using the appropriate 
schema for each format but now in the csv FileSystemDataset, 0 csv files are 
read. I'm really confused as the schema in the original csv FileSystemDataset 
is exactly the same as the one created from the unified schema as well as the 
FileSystemDataset opened by explicitly specifying the schema. 
   
   Not sure what I'm doing wrong. Very open to more elegant approaches to 
tackling the overall problem also. 
   
   ### Reprex
   ``` r
   tmpdir <- tempdir()
   usethis::create_from_github("annakrystalli/debugging-arrow", destdir = 
tmpdir,
                               fork = FALSE, open = FALSE)
   #> ℹ Defaulting to 'https' Git protocol
   #> ✔ Creating 
'/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/Rtmpt7xDEu/debugging-arrow/'
   #> ✔ Cloning repo from 
'https://github.com/annakrystalli/debugging-arrow.git' into 
'/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/Rtmpt7xDEu/debugging-arrow'
   #> ✔ Setting active project to 
'/private/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T/Rtmpt7xDEu/debugging-arrow'
   #> ℹ Default branch is 'main'
   #> ✔ Setting active project to '<no active project>'
   
   library(dplyr)
   #> 
   #> Attaching package: 'dplyr'
   #> 
   #> The following objects are masked from 'package:stats':
   #> 
   #>     filter, lag
   #> 
   #> The following objects are masked from 'package:base':
   #> 
   #>     intersect, setdiff, setequal, union
   origin_path <- file.path(tmpdir, "debugging-arrow/simple/model-output/")
   
   fs::dir_tree(origin_path)
   #> 
/var/folders/yb/936h04ss57x2rdmly_tv561m0000gp/T//Rtmpt7xDEu/debugging-arrow/simple/model-output/
   #> ├── simple_hub-baseline
   #> │   ├── 2022-10-01-simple_hub-baseline.csv
   #> │   ├── 2022-10-08-simple_hub-baseline.csv
   #> │   └── 2022-10-15-simple_hub-baseline.parquet
   #> └── team1-goodmodel
   #>     └── 2022-10-08-team1-goodmodel.csv
   
   # Open one dataset foe each format excluding invalid files
   formats <- c("csv", "parquet")
   
   conns <- purrr::map(purrr::set_names(formats),
                       ~arrow::open_dataset(
                           origin_path, format = .x,
                           partitioning = "team",
                           factory_options = list(exclude_invalid_files = 
TRUE)))
   
   arrow::open_dataset(conns)
   #> Error: Invalid: Unable to merge: Field horizon has incompatible types: 
int64 vs int32
   arrow::open_dataset(conns, unify_schemas = TRUE)
   #> Error: Invalid: Unable to merge: Field horizon has incompatible types: 
int64 vs int32
   arrow::open_dataset(conns, unify_schemas = FALSE)
   #> Error: Type error: fields had matching names but differing types. From: 
horizon: int32 To: horizon: int64
   # Problem arising form mismatched int fields between the two datasets
   conns
   #> $csv
   #> FileSystemDataset with 3 csv files
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   #> 
   #> $parquet
   #> FileSystemDataset with 1 Parquet file
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int32
   #> location: string
   #> age_group: string
   #> type: string
   #> type_id: double
   #> value: int32
   #> team: string
   
   
   # Functions ----
   # Function to get schema for fields in y not present in x
   unify_conn_schema <- function(x, y) {
       setdiff(y$schema$names, x$schema$names) %>%
           purrr::map(~y$schema$GetFieldByName(.x)) %>%
           arrow::schema() %>%
           arrow::unify_schemas(x$schema)
   }
   
   # Get schema for fields in a dataset connection from a unified schema
   get_unified_schema <- function(x, unified_schema) {
       new_schema <- x$schema$names %>%
           purrr::map(~unified_schema$GetFieldByName(.x)) %>%
           arrow::schema()
   }
   
   # Get unified schema across all datasets
   unified_schema <- purrr::reduce(conns, unify_conn_schema)
   unified_schema
   #> Schema
   #> age_group: string
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   
   # Get schema for each connection from unified schema
   conn_schema <- purrr::map(conns, ~get_unified_schema(.x,
                                                        unified_schema))
   
   conn_schema
   #> $csv
   #> Schema
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   #> 
   #> $parquet
   #> Schema
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> age_group: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   
   
   # Replacing the active binding via assignment doesn't seem to work
   purrr::map2(conns, conn_schema,
               function(x, y){
                   x$schema <- y})
   #> Error in `purrr::map2()`:
   #> ℹ In index: 2.
   #> ℹ With name: parquet.
   #> Caused by error:
   #> ! Type error: fields had matching names but differing types. From: 
horizon: int32 To: horizon: int64
   
   #> Backtrace:
   #>      ▆
   #>   1. ├─purrr::map2(...)
   #>   2. │ └─purrr:::map2_("list", .x, .y, .f, ..., .progress = .progress)
   #>   3. │   ├─purrr:::with_indexed_errors(...)
   #>   4. │   │ └─base::withCallingHandlers(...)
   #>   5. │   ├─purrr:::call_with_cleanup(...)
   #>   6. │   └─global .f(.x[[i]], .y[[i]], ...)
   #>   7. ├─arrow (local) `<fn>`(base::quote(`<Schema>`))
   #>   8. │ └─self$WithSchema(schema)
   #>   9. │   └─arrow:::dataset___Dataset__ReplaceSchema(self, schema)
   #>  10. └─base::.handleSimpleError(...)
   #>  11.   └─purrr (local) h(simpleError(msg, call))
   #>  12.     └─cli::cli_abort(...)
   #>  13.       └─rlang::abort(...)
   
   # reconnect using appropriate schema for each connection
   conns_unified <- purrr::map2(names(conns), conn_schema,
                        ~arrow::open_dataset(
                            origin_path, format = .x,
                            partitioning = "team",
                            factory_options = list(exclude_invalid_files = 
TRUE),
                            schema = .y))
   
   conns_unified
   #> [[1]]
   #> FileSystemDataset with 0 csv files
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   #> 
   #> [[2]]
   #> FileSystemDataset with 1 Parquet file
   #> origin_date: date32[day]
   #> target: string
   #> horizon: int64
   #> location: string
   #> age_group: string
   #> type: string
   #> type_id: double
   #> value: int64
   #> team: string
   
   # All schema are equal!
   all.equal(conns[[1]]$schema, conns_unified[[1]]$schema, conn_schema[[1]])
   #> [1] TRUE
   
   # opening dataset from unified connections does not return data form csv file
   arrow::open_dataset(conns_unified)  %>%
       filter(origin_date == "2022-10-08") %>%
       collect()
   #> # A tibble: 0 × 9
   #> # … with 9 variables: origin_date <date>, target <chr>, horizon <int>,
   #> #   location <chr>, type <chr>, type_id <dbl>, value <int>, team <chr>,
   #> #   age_group <chr>
   
   # Only data from the single parquet file returned
   arrow::open_dataset(conns_unified)  %>%
       filter(origin_date == "2022-10-15") %>%
       collect()
   #> # A tibble: 276 × 9
   #>    origin_date target          horizon locat…¹ type  type_id value team  
age_g…²
   #>    <date>      <chr>             <int> <chr>   <chr>   <dbl> <int> <chr> 
<chr>  
   #>  1 2022-10-15  wk inc flu hosp       1 US      quan…   0.01    135 simp… 
65+    
   #>  2 2022-10-15  wk inc flu hosp       1 US      quan…   0.025   137 simp… 
65+    
   #>  3 2022-10-15  wk inc flu hosp       1 US      quan…   0.05    139 simp… 
65+    
   #>  4 2022-10-15  wk inc flu hosp       1 US      quan…   0.1     140 simp… 
65+    
   #>  5 2022-10-15  wk inc flu hosp       1 US      quan…   0.15    141 simp… 
65+    
   #>  6 2022-10-15  wk inc flu hosp       1 US      quan…   0.2     141 simp… 
65+    
   #>  7 2022-10-15  wk inc flu hosp       1 US      quan…   0.25    142 simp… 
65+    
   #>  8 2022-10-15  wk inc flu hosp       1 US      quan…   0.3     143 simp… 
65+    
   #>  9 2022-10-15  wk inc flu hosp       1 US      quan…   0.35    144 simp… 
65+    
   #> 10 2022-10-15  wk inc flu hosp       1 US      quan…   0.4     145 simp… 
65+    
   #> # … with 266 more rows, and abbreviated variable names ¹​location, 
²​age_group
   ```
   
   <sup>Created on 2023-03-16 with [reprex 
v2.0.2](https://reprex.tidyverse.org)</sup>
   
   <details style="margin-bottom:10px;">
   <summary>
   Session info
   </summary>
   
   ``` r
   sessioninfo::session_info()
   #> ─ Session info 
───────────────────────────────────────────────────────────────
   #>  setting  value
   #>  version  R version 4.2.1 (2022-06-23)
   #>  os       macOS Ventura 13.2.1
   #>  system   aarch64, darwin20
   #>  ui       X11
   #>  language (EN)
   #>  collate  en_US.UTF-8
   #>  ctype    en_US.UTF-8
   #>  tz       Europe/Athens
   #>  date     2023-03-16
   #>  pandoc   2.19.2 @ 
/Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via 
rmarkdown)
   #> 
   #> ─ Packages 
───────────────────────────────────────────────────────────────────
   #>  package     * version  date (UTC) lib source
   #>  arrow         11.0.0.3 2023-03-08 [1] CRAN (R 4.2.0)
   #>  askpass       1.1      2019-01-13 [1] CRAN (R 4.2.0)
   #>  assertthat    0.2.1    2019-03-21 [1] CRAN (R 4.2.0)
   #>  bit           4.0.5    2022-11-15 [1] CRAN (R 4.2.0)
   #>  bit64         4.0.5    2020-08-30 [1] CRAN (R 4.2.0)
   #>  cli           3.6.0    2023-01-09 [1] CRAN (R 4.2.0)
   #>  crayon        1.5.2    2022-09-29 [1] CRAN (R 4.2.0)
   #>  credentials   1.3.2    2021-11-29 [1] CRAN (R 4.2.1)
   #>  curl          5.0.0    2023-01-12 [1] CRAN (R 4.2.0)
   #>  digest        0.6.31   2022-12-11 [1] CRAN (R 4.2.0)
   #>  dplyr       * 1.1.0    2023-01-29 [1] CRAN (R 4.2.0)
   #>  evaluate      0.20     2023-01-17 [1] CRAN (R 4.2.0)
   #>  fansi         1.0.4    2023-01-22 [1] CRAN (R 4.2.0)
   #>  fastmap       1.1.1    2023-02-24 [1] CRAN (R 4.2.0)
   #>  fs            1.6.1    2023-02-06 [1] CRAN (R 4.2.0)
   #>  generics      0.1.3    2022-07-05 [1] CRAN (R 4.2.1)
   #>  gert          1.9.2    2022-12-05 [1] CRAN (R 4.2.0)
   #>  gh            1.3.1    2022-09-08 [1] CRAN (R 4.2.1)
   #>  gitcreds      0.1.2    2022-09-08 [1] CRAN (R 4.2.1)
   #>  glue          1.6.2    2022-02-24 [1] CRAN (R 4.2.0)
   #>  htmltools     0.5.4    2022-12-07 [1] CRAN (R 4.2.0)
   #>  httr          1.4.5    2023-02-24 [1] CRAN (R 4.2.0)
   #>  jsonlite      1.8.4    2022-12-06 [1] CRAN (R 4.2.0)
   #>  knitr         1.42     2023-01-25 [1] CRAN (R 4.2.0)
   #>  lifecycle     1.0.3    2022-10-07 [1] CRAN (R 4.2.0)
   #>  magrittr      2.0.3    2022-03-30 [1] CRAN (R 4.2.0)
   #>  openssl       2.0.5    2022-12-06 [1] CRAN (R 4.2.0)
   #>  pillar        1.8.1    2022-08-19 [1] CRAN (R 4.2.0)
   #>  pkgconfig     2.0.3    2019-09-22 [1] CRAN (R 4.2.0)
   #>  purrr         1.0.1    2023-01-10 [1] CRAN (R 4.2.0)
   #>  R.cache       0.16.0   2022-07-21 [1] CRAN (R 4.2.0)
   #>  R.methodsS3   1.8.2    2022-06-13 [1] CRAN (R 4.2.0)
   #>  R.oo          1.25.0   2022-06-12 [1] CRAN (R 4.2.0)
   #>  R.utils       2.12.0   2022-06-28 [1] CRAN (R 4.2.0)
   #>  R6            2.5.1    2021-08-19 [1] CRAN (R 4.2.0)
   #>  reprex        2.0.2    2022-08-17 [3] CRAN (R 4.2.0)
   #>  rlang         1.0.6    2022-09-24 [1] CRAN (R 4.2.0)
   #>  rmarkdown     2.20     2023-01-19 [1] CRAN (R 4.2.0)
   #>  rprojroot     2.0.3    2022-04-02 [1] CRAN (R 4.2.1)
   #>  rstudioapi    0.14     2022-08-22 [1] CRAN (R 4.2.1)
   #>  sessioninfo   1.2.2    2021-12-06 [3] CRAN (R 4.2.0)
   #>  styler        1.7.0    2022-03-13 [1] CRAN (R 4.2.0)
   #>  sys           3.4.1    2022-10-18 [1] CRAN (R 4.2.0)
   #>  tibble        3.2.0    2023-03-08 [1] CRAN (R 4.2.0)
   #>  tidyselect    1.2.0    2022-10-10 [1] CRAN (R 4.2.0)
   #>  usethis       2.1.6    2022-05-25 [1] CRAN (R 4.2.0)
   #>  utf8          1.2.3    2023-01-31 [1] CRAN (R 4.2.0)
   #>  vctrs         0.5.2    2023-01-23 [1] CRAN (R 4.2.0)
   #>  withr         2.5.0    2022-03-03 [1] CRAN (R 4.2.0)
   #>  xfun          0.37     2023-01-31 [1] CRAN (R 4.2.0)
   #>  yaml          2.3.7    2023-01-23 [1] CRAN (R 4.2.0)
   #> 
   #>  [1] /Users/Anna/Library/R/arm64/4.2/library
   #>  [2] 
/Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/site-library
   #>  [3] /Library/Frameworks/R.framework/Versions/4.2-arm64/Resources/library
   #> 
   #> 
──────────────────────────────────────────────────────────────────────────────
   ```
   
   </details>
   
   ### Component(s)
   
   R


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to