This is an automated email from the ASF dual-hosted git repository. jonkeane pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 4b3f4677b9 ARROW-15517: [R] Use WriteNode in write_dataset() 4b3f4677b9 is described below commit 4b3f4677b995cb7263e4a4e65daf00189f638617 Author: Neal Richardson <neal.p.richard...@gmail.com> AuthorDate: Tue Apr 19 16:40:57 2022 -0500 ARROW-15517: [R] Use WriteNode in write_dataset() This should allow streaming writes in more cases, e.g. with a join. Closes #12316 from nealrichardson/write-node Authored-by: Neal Richardson <neal.p.richard...@gmail.com> Signed-off-by: Jonathan Keane <jke...@gmail.com> --- r/R/arrowExports.R | 8 ++-- r/R/dataset-format.R | 4 +- r/R/dataset-write.R | 87 +++++++++++++++++++++++++++-------- r/R/dplyr.R | 11 ++++- r/R/metadata.R | 22 ++++++++- r/R/parquet.R | 38 ++++++++------- r/R/query-engine.R | 29 +++++------- r/src/arrowExports.cpp | 62 +++++++++++++------------ r/src/compute-exec.cpp | 49 ++++++++++++++++++-- r/src/dataset.cpp | 24 ---------- r/tests/testthat/test-dataset-write.R | 70 ++++++++++++++++++++-------- r/tests/testthat/test-metadata.R | 36 ++++++++++++--- 12 files changed, 291 insertions(+), 149 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 7bf77f1e66..6b969336c9 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -420,6 +420,10 @@ ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) } +ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) +} + ExecNode_Filter <- function(input, filter) { .Call(`_arrow_ExecNode_Filter`, input, filter) } @@ -748,10 +752,6 @@ dataset___Scanner__schema <- function(sc) { .Call(`_arrow_dataset___Scanner__schema`, sc) } -dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { - invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) -} - dataset___Scanner__TakeRows <- function(scanner, indices) { .Call(`_arrow_dataset___Scanner__TakeRows`, scanner, indices) } diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index f00efd0350..acc1a41b02 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -390,7 +390,7 @@ ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE, FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, public = list( - update = function(table, ...) { + update = function(column_names, ...) { check_additional_args <- function(format, passed_args) { if (format == "parquet") { supported_args <- names(formals(write_parquet)) @@ -437,7 +437,7 @@ FileWriteOptions <- R6Class("FileWriteOptions", if (self$type == "parquet") { dataset___ParquetFileWriteOptions__update( self, - ParquetWriterProperties$create(table, ...), + ParquetWriterProperties$create(column_names, ...), ParquetArrowWriterProperties$create(...) ) } else if (self$type == "ipc") { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index d7c73908e7..09b3ebdbe6 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -136,41 +136,88 @@ write_dataset <- function(dataset, if (inherits(dataset, "arrow_dplyr_query")) { # partitioning vars need to be in the `select` schema dataset <- ensure_group_vars(dataset) - } else if (inherits(dataset, "grouped_df")) { - force(partitioning) - # Drop the grouping metadata before writing; we've already consumed it - # now to construct `partitioning` and don't want it in the metadata$r - dataset <- dplyr::ungroup(dataset) + } else { + if (inherits(dataset, "grouped_df")) { + force(partitioning) + # Drop the grouping metadata before writing; we've already consumed it + # now to construct `partitioning` and don't want it in the metadata$r + dataset <- dplyr::ungroup(dataset) + } + dataset <- tryCatch( + as_adq(dataset), + error = function(e) { + supported <- c( + "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame" + ) + stop( + "'dataset' must be a ", + oxford_paste(supported, "or", quote = FALSE), + ", not ", + deparse(class(dataset)), + call. = FALSE + ) + } + ) + } + + plan <- ExecPlan$create() + final_node <- plan$Build(dataset) + if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) { + # Because sorting and topK are only handled in the SinkNode (or in R!), + # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan + # to capture those, and then create a new plan for writing + # TODO(ARROW-15681): do sorting in WriteNode in C++ + dataset <- as_adq(plan$Run(final_node)) + plan <- ExecPlan$create() + final_node <- plan$Build(dataset) } - scanner <- Scanner$create(dataset) if (!inherits(partitioning, "Partitioning")) { - partition_schema <- scanner$schema[partitioning] + partition_schema <- final_node$schema[partitioning] if (isTRUE(hive_style)) { - partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback) + partitioning <- HivePartitioning$create( + partition_schema, + null_fallback = list(...)$null_fallback + ) } else { partitioning <- DirectoryPartitioning$create(partition_schema) } } - if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { - max_rows_per_group <- max_rows_per_file - } - path_and_fs <- get_path_and_filesystem(path) - options <- FileWriteOptions$create(format, table = scanner, ...) + output_schema <- final_node$schema + options <- FileWriteOptions$create( + format, + column_names = names(output_schema), + ... + ) + # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R + # and encapsulate this logic better existing_data_behavior_opts <- c("delete_matching", "overwrite", "error") existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L - validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer") - validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer") - validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer") - validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") + if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { + max_rows_per_group <- max_rows_per_file + } - dataset___Dataset__Write( + validate_positive_int_value(max_partitions) + validate_positive_int_value(max_open_files) + validate_positive_int_value(min_rows_per_group) + validate_positive_int_value(max_rows_per_group) + + new_r_meta <- get_r_metadata_from_old_schema( + output_schema, + source_data(dataset)$schema, + drop_attributes = has_aggregation(dataset) + ) + if (!is.null(new_r_meta)) { + output_schema$r_metadata <- new_r_meta + } + plan$Write( + final_node, prepare_key_value_metadata(output_schema$metadata), options, path_and_fs$fs, path_and_fs$path, - partitioning, basename_template, scanner, + partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group @@ -179,6 +226,6 @@ write_dataset <- function(dataset, validate_positive_int_value <- function(value, msg) { if (!is_integerish(value, n = 1) || is.na(value) || value < 0) { - abort(msg) + abort(paste(substitute(value), "must be a positive, non-missing integer")) } } diff --git a/r/R/dplyr.R b/r/R/dplyr.R index e6d7889078..c9650fb065 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -24,7 +24,12 @@ arrow_dplyr_query <- function(.data) { # RecordBatch, or Dataset) and the state of the user's dplyr query--things # like selected columns, filters, and group vars. # An arrow_dplyr_query can contain another arrow_dplyr_query in .data - gv <- dplyr::group_vars(.data) %||% character() + gv <- tryCatch( + # If dplyr is not available, or if the input doesn't have a group_vars + # method, assume no group vars + dplyr::group_vars(.data) %||% character(), + error = function(e) character() + ) if (inherits(.data, "data.frame")) { .data <- Table$create(.data) @@ -247,7 +252,9 @@ abandon_ship <- function(call, .data, msg) { query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader")) source_data <- function(x) { - if (is_collapsed(x)) { + if (!inherits(x, "arrow_dplyr_query")) { + x + } else if (is_collapsed(x)) { source_data(x$.data) } else { x$.data diff --git a/r/R/metadata.R b/r/R/metadata.R index d88297dd92..f0411eb54a 100644 --- a/r/R/metadata.R +++ b/r/R/metadata.R @@ -133,7 +133,6 @@ remove_attributes <- function(x) { } arrow_attributes <- function(x, only_top_level = FALSE) { - att <- attributes(x) removed_attributes <- remove_attributes(x) @@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) { NULL } } + +get_r_metadata_from_old_schema <- function(new_schema, + old_schema, + drop_attributes = FALSE) { + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- old_schema$r_metadata + if (!is.null(r_meta)) { + # Filter r_metadata$columns on columns with name _and_ type match + common_names <- intersect(names(r_meta$columns), names(new_schema)) + keep <- common_names[ + map_lgl(common_names, ~ old_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + if (drop_attributes) { + # dplyr drops top-level attributes if you do summarize + r_meta$attributes <- NULL + } + } + r_meta +} diff --git a/r/R/parquet.R b/r/R/parquet.R index 3a07c224ed..c6c00ed3a4 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -186,7 +186,7 @@ write_parquet <- function(x, x$schema, sink, properties = properties %||% ParquetWriterProperties$create( - x, + names(x), version = version, compression = compression, compression_level = compression_level, @@ -307,33 +307,33 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", set_version = function(version) { parquet___WriterProperties___Builder__version(self, make_valid_version(version)) }, - set_compression = function(table, compression) { + set_compression = function(column_names, compression) { compression <- compression_from_name(compression) assert_that(is.integer(compression)) private$.set( - table, compression, + column_names, compression, parquet___ArrowWriterProperties___Builder__set_compressions ) }, - set_compression_level = function(table, compression_level) { + set_compression_level = function(column_names, compression_level) { # cast to integer but keep names compression_level <- set_names(as.integer(compression_level), names(compression_level)) private$.set( - table, compression_level, + column_names, compression_level, parquet___ArrowWriterProperties___Builder__set_compression_levels ) }, - set_dictionary = function(table, use_dictionary) { + set_dictionary = function(column_names, use_dictionary) { assert_that(is.logical(use_dictionary)) private$.set( - table, use_dictionary, + column_names, use_dictionary, parquet___ArrowWriterProperties___Builder__set_use_dictionary ) }, - set_write_statistics = function(table, write_statistics) { + set_write_statistics = function(column_names, write_statistics) { assert_that(is.logical(write_statistics)) private$.set( - table, write_statistics, + column_names, write_statistics, parquet___ArrowWriterProperties___Builder__set_write_statistics ) }, @@ -342,9 +342,8 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", } ), private = list( - .set = function(table, value, FUN) { + .set = function(column_names, value, FUN) { msg <- paste0("unsupported ", substitute(value), "= specification") - column_names <- names(table) given_names <- names(value) if (is.null(given_names)) { if (length(value) %in% c(1L, length(column_names))) { @@ -364,7 +363,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", ) ) -ParquetWriterProperties$create <- function(table, +ParquetWriterProperties$create <- function(column_names, version = NULL, compression = default_parquet_compression(), compression_level = NULL, @@ -377,16 +376,16 @@ ParquetWriterProperties$create <- function(table, builder$set_version(version) } if (!is.null(compression)) { - builder$set_compression(table, compression = compression) + builder$set_compression(column_names, compression = compression) } if (!is.null(compression_level)) { - builder$set_compression_level(table, compression_level = compression_level) + builder$set_compression_level(column_names, compression_level = compression_level) } if (!is.null(use_dictionary)) { - builder$set_dictionary(table, use_dictionary) + builder$set_dictionary(column_names, use_dictionary) } if (!is.null(write_statistics)) { - builder$set_write_statistics(table, write_statistics) + builder$set_write_statistics(column_names, write_statistics) } if (!is.null(data_page_size)) { builder$set_data_page_size(data_page_size) @@ -600,10 +599,9 @@ ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads)) } -calculate_chunk_size <- function(rows, columns, - target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8), - max_chunks = getOption("arrow.parquet_max_chunks", 200) - ) { +calculate_chunk_size <- function(rows, columns, + target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8), + max_chunks = getOption("arrow.parquet_max_chunks", 200)) { # Ensure is a float to prevent integer overflow issues num_cells <- as.numeric(rows) * as.numeric(columns) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 6c1b14036f..c794bc9de6 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -33,26 +33,15 @@ do_exec_plan <- function(.data) { if (ncol(tab)) { # Apply any column metadata from the original schema, where appropriate - original_schema <- source_data(.data)$schema - # TODO: do we care about other (non-R) metadata preservation? - # How would we know if it were meaningful? - r_meta <- original_schema$r_metadata - if (!is.null(r_meta)) { - # Filter r_metadata$columns on columns with name _and_ type match - new_schema <- tab$schema - common_names <- intersect(names(r_meta$columns), names(tab)) - keep <- common_names[ - map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) - ] - r_meta$columns <- r_meta$columns[keep] - if (has_aggregation(.data)) { - # dplyr drops top-level attributes if you do summarize - r_meta$attributes <- NULL - } - tab$r_metadata <- r_meta + new_r_metadata <- get_r_metadata_from_old_schema( + tab$schema, + source_data(.data)$schema, + drop_attributes = has_aggregation(.data) + ) + if (!is.null(new_r_metadata)) { + tab$r_metadata <- new_r_metadata } } - tab } @@ -244,6 +233,10 @@ ExecPlan <- R6Class("ExecPlan", } out }, + Write = function(node, ...) { + # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... + ExecPlan_Write(self, node, ...) + }, Stop = function() ExecPlan_StopProducing(self) ) ) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 81dcc0dddc..fb9f3b94d1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -898,11 +898,11 @@ END_CPP11 } // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<arrow::dataset::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names); +std::shared_ptr<compute::ExecNode> ExecNode_Scan(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names); extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ BEGIN_CPP11 arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp); - arrow::r::Input<const std::shared_ptr<arrow::dataset::Dataset>&>::type dataset(dataset_sexp); + arrow::r::Input<const std::shared_ptr<ds::Dataset>&>::type dataset(dataset_sexp); arrow::r::Input<const std::shared_ptr<compute::Expression>&>::type filter(filter_sexp); arrow::r::Input<std::vector<std::string>>::type materialized_field_names(materialized_field_names_sexp); return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names)); @@ -914,6 +914,35 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil } #endif +// compute-exec.cpp +#if defined(ARROW_R_WITH_DATASET) +void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata, const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_ro [...] +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +BEGIN_CPP11 + arrow::r::Input<const std::shared_ptr<compute::ExecPlan>&>::type plan(plan_sexp); + arrow::r::Input<const std::shared_ptr<compute::ExecNode>&>::type final_node(final_node_sexp); + arrow::r::Input<cpp11::strings>::type metadata(metadata_sexp); + arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp); + arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp); + arrow::r::Input<std::string>::type base_dir(base_dir_sexp); + arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp); + arrow::r::Input<std::string>::type basename_template(basename_template_sexp); + arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp); + arrow::r::Input<int>::type max_partitions(max_partitions_sexp); + arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp); + arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp); + arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp); + arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp); + ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ + Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // compute-exec.cpp std::shared_ptr<compute::ExecNode> ExecNode_Filter(const std::shared_ptr<compute::ExecNode>& input, const std::shared_ptr<compute::Expression>& filter); extern "C" SEXP _arrow_ExecNode_Filter(SEXP input_sexp, SEXP filter_sexp){ @@ -2041,33 +2070,6 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ } #endif -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ -BEGIN_CPP11 - arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp); - arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp); - arrow::r::Input<std::string>::type base_dir(base_dir_sexp); - arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp); - arrow::r::Input<std::string>::type basename_template(basename_template_sexp); - arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp); - arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp); - arrow::r::Input<int>::type max_partitions(max_partitions_sexp); - arrow::r::Input<uint32_t>::type max_open_files(max_open_files_sexp); - arrow::r::Input<uint64_t>::type max_rows_per_file(max_rows_per_file_sexp); - arrow::r::Input<uint64_t>::type min_rows_per_group(min_rows_per_group_sexp); - arrow::r::Input<uint64_t>::type max_rows_per_group(max_rows_per_group_sexp); - dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); - return R_NilValue; -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ - Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows(const std::shared_ptr<ds::Scanner>& scanner, const std::shared_ptr<arrow::Array>& indices); @@ -5197,6 +5199,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, + { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, { "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3}, { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5}, @@ -5279,7 +5282,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, - { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index e7d8df55bb..4c3cc92257 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -121,19 +121,20 @@ std::shared_ptr<arrow::Schema> ExecNode_output_schema( #if defined(ARROW_R_WITH_DATASET) +#include <arrow/dataset/file_base.h> #include <arrow/dataset/plan.h> #include <arrow/dataset/scanner.h> // [[dataset::export]] std::shared_ptr<compute::ExecNode> ExecNode_Scan( const std::shared_ptr<compute::ExecPlan>& plan, - const std::shared_ptr<arrow::dataset::Dataset>& dataset, + const std::shared_ptr<ds::Dataset>& dataset, const std::shared_ptr<compute::Expression>& filter, std::vector<std::string> materialized_field_names) { arrow::dataset::internal::Initialize(); // TODO: pass in FragmentScanOptions - auto options = std::make_shared<arrow::dataset::ScanOptions>(); + auto options = std::make_shared<ds::ScanOptions>(); options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); @@ -154,7 +155,49 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan( .Bind(*dataset->schema())); return MakeExecNodeOrStop("scan", plan.get(), {}, - arrow::dataset::ScanNodeOptions{dataset, options}); + ds::ScanNodeOptions{dataset, options}); +} + +// [[dataset::export]] +void ExecPlan_Write( + const std::shared_ptr<compute::ExecPlan>& plan, + const std::shared_ptr<compute::ExecNode>& final_node, cpp11::strings metadata, + const std::shared_ptr<ds::FileWriteOptions>& file_write_options, + const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, + const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, + uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { + arrow::dataset::internal::Initialize(); + + // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R + // and encapsulate this logic better + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + opts.max_partitions = max_partitions; + opts.max_open_files = max_open_files; + opts.max_rows_per_file = max_rows_per_file; + opts.min_rows_per_group = min_rows_per_group; + opts.max_rows_per_group = max_rows_per_group; + + // TODO: factor this out to a strings_to_KVM() helper + auto values = cpp11::as_cpp<std::vector<std::string>>(metadata); + auto names = cpp11::as_cpp<std::vector<std::string>>(metadata.attr("names")); + + auto kv = + std::make_shared<arrow::KeyValueMetadata>(std::move(names), std::move(values)); + + MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, + ds::WriteNodeOptions{std::move(opts), std::move(kv)}); + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); } #endif diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 4881830560..4ff30d9d94 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -511,30 +511,6 @@ std::shared_ptr<arrow::Schema> dataset___Scanner__schema( return sc->options()->projected_schema; } -// [[dataset::export]] -void dataset___Dataset__Write( - const std::shared_ptr<ds::FileWriteOptions>& file_write_options, - const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, - const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, - const std::shared_ptr<ds::Scanner>& scanner, - arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, - uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, - uint64_t max_rows_per_group) { - ds::FileSystemDatasetWriteOptions opts; - opts.file_write_options = file_write_options; - opts.existing_data_behavior = existing_data_behavior; - opts.filesystem = filesystem; - opts.base_dir = base_dir; - opts.partitioning = partitioning; - opts.basename_template = basename_template; - opts.max_partitions = max_partitions; - opts.max_open_files = max_open_files; - opts.max_rows_per_file = max_rows_per_file; - opts.min_rows_per_group = min_rows_per_group; - opts.max_rows_per_group = max_rows_per_group; - StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner)); -} - // [[dataset::export]] std::shared_ptr<arrow::Table> dataset___Scanner__TakeRows( const std::shared_ptr<ds::Scanner>& scanner, diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index aafb4bf292..5b657148a5 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -244,6 +244,24 @@ test_that("Dataset writing: dplyr methods", { new_ds %>% select(c(names(df1), "twice")) %>% collect(), df1 %>% filter(int == 4) %>% mutate(twice = int * 2) ) + + # head + dst_dir4 <- tempfile() + ds %>% + mutate(twice = int * 2) %>% + arrange(int) %>% + head(3) %>% + write_dataset(dst_dir4, format = "feather") + new_ds <- open_dataset(dst_dir4, format = "feather") + + expect_equal( + new_ds %>% + select(c(names(df1), "twice")) %>% + collect(), + df1 %>% + mutate(twice = int * 2) %>% + head(3) + ) }) test_that("Dataset writing: non-hive", { @@ -321,6 +339,7 @@ test_that("Dataset writing: from RecordBatch", { dst_dir <- tempfile() stacked <- record_batch(rbind(df1, df2)) stacked %>% + mutate(twice = int * 2) %>% group_by(int) %>% write_dataset(dst_dir, format = "feather") expect_true(dir.exists(dst_dir)) @@ -438,7 +457,7 @@ test_that("Writing a dataset: CSV format options", { test_that("Dataset writing: unsupported features/input validation", { skip_if_not_available("parquet") - expect_error(write_dataset(4), 'dataset must be a "Dataset"') + expect_error(write_dataset(4), "'dataset' must be a Dataset, ") ds <- open_dataset(hive_dir) expect_error( @@ -520,7 +539,6 @@ test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file", { expect_silent( write_dataset(df, dst_dir, max_rows_per_file = 5) ) - }) @@ -571,17 +589,27 @@ test_that("Dataset write max open files", { partitioning <- "c2" num_of_unique_c2_groups <- 5 - record_batch_1 <- record_batch(c1 = c(1, 2, 3, 4, 0, 10), - c2 = c("a", "b", "c", "d", "e", "a")) - record_batch_2 <- record_batch(c1 = c(5, 6, 7, 8, 0, 1), - c2 = c("a", "b", "c", "d", "e", "c")) - record_batch_3 <- record_batch(c1 = c(9, 10, 11, 12, 0, 1), - c2 = c("a", "b", "c", "d", "e", "d")) - record_batch_4 <- record_batch(c1 = c(13, 14, 15, 16, 0, 1), - c2 = c("a", "b", "c", "d", "e", "b")) + record_batch_1 <- record_batch( + c1 = c(1, 2, 3, 4, 0, 10), + c2 = c("a", "b", "c", "d", "e", "a") + ) + record_batch_2 <- record_batch( + c1 = c(5, 6, 7, 8, 0, 1), + c2 = c("a", "b", "c", "d", "e", "c") + ) + record_batch_3 <- record_batch( + c1 = c(9, 10, 11, 12, 0, 1), + c2 = c("a", "b", "c", "d", "e", "d") + ) + record_batch_4 <- record_batch( + c1 = c(13, 14, 15, 16, 0, 1), + c2 = c("a", "b", "c", "d", "e", "b") + ) - table <- Table$create(d1 = record_batch_1, d2 = record_batch_2, - d3 = record_batch_3, d4 = record_batch_4) + table <- Table$create( + d1 = record_batch_1, d2 = record_batch_2, + d3 = record_batch_3, d4 = record_batch_4 + ) write_dataset(table, path = dst_dir, format = file_format, partitioning = partitioning) @@ -643,12 +671,18 @@ test_that("Dataset write max rows per files", { test_that("Dataset min_rows_per_group", { skip_if_not_available("parquet") - rb1 <- record_batch(c1 = c(1, 2, 3, 4), - c2 = c("a", "b", "e", "a")) - rb2 <- record_batch(c1 = c(5, 6, 7, 8, 9), - c2 = c("a", "b", "c", "d", "h")) - rb3 <- record_batch(c1 = c(10, 11), - c2 = c("a", "b")) + rb1 <- record_batch( + c1 = c(1, 2, 3, 4), + c2 = c("a", "b", "e", "a") + ) + rb2 <- record_batch( + c1 = c(5, 6, 7, 8, 9), + c2 = c("a", "b", "c", "d", "h") + ) + rb3 <- record_batch( + c1 = c(10, 11), + c2 = c("a", "b") + ) dataset <- Table$create(d1 = rb1, d2 = rb2, d3 = rb3) diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R index 3217b58d6c..4db20d04df 100644 --- a/r/tests/testthat/test-metadata.R +++ b/r/tests/testthat/test-metadata.R @@ -226,11 +226,13 @@ test_that("Row-level metadata (does not by default) roundtrip", { # But we can re-enable this / read data that has already been written with # row-level metadata withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { tab <- Table$create(df) expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar") expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux") - }) + } + ) }) @@ -256,7 +258,8 @@ test_that("Row-level metadata (does not) roundtrip in datasets", { dst_dir <- make_temp_dir() withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { expect_warning( write_dataset(df, dst_dir, partitioning = "part"), "Row-level metadata is not compatible with datasets and will be discarded" @@ -286,7 +289,25 @@ test_that("Row-level metadata (does not) roundtrip in datasets", { df_from_ds <- ds %>% select(int) %>% collect(), NA ) - }) + } + ) +}) + +test_that("Dataset writing does handle other metadata", { + skip_if_not_available("dataset") + skip_if_not_available("parquet") + + dst_dir <- make_temp_dir() + write_dataset(example_with_metadata, dst_dir, partitioning = "b") + + ds <- open_dataset(dst_dir) + expect_equal( + ds %>% + # partitioning on b puts it last, so move it back + select(a, b, c, d) %>% + collect(), + example_with_metadata + ) }) test_that("When we encounter SF cols, we warn", { @@ -305,11 +326,13 @@ test_that("When we encounter SF cols, we warn", { # But we can re-enable this / read data that has already been written with # row-level metadata without a warning withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { expect_warning(tab <- Table$create(df), NA) expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar") expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux") - }) + } + ) }) test_that("dplyr with metadata", { @@ -369,7 +392,6 @@ test_that("grouped_df metadata is recorded (efficiently)", { }) test_that("grouped_df non-arrow metadata is preserved", { - simple_tbl <- tibble(a = 1:2, b = 3:4) attr(simple_tbl, "other_metadata") <- "look I'm still here!" grouped <- group_by(simple_tbl, a)