nealrichardson commented on a change in pull request #12316: URL: https://github.com/apache/arrow/pull/12316#discussion_r806275298
########## File path: r/src/compute-exec.cpp ########## @@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan( arrow::dataset::ScanNodeOptions{dataset, options}); } -#endif +// [[dataset::export]] +void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, + const std::shared_ptr<compute::ExecNode>& final_node, + const std::shared_ptr<ds::FileWriteOptions>& file_write_options, + const std::shared_ptr<fs::FileSystem>& filesystem, + std::string base_dir, + const std::shared_ptr<ds::Partitioning>& partitioning, + std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, + int max_partitions) { + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + opts.max_partitions = max_partitions; + + MakeExecNodeOrStop( + "write", final_node->plan(), {final_node.get()}, + ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())}); + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); +} // [[dataset::export]] std::shared_ptr<compute::ExecNode> ExecNode_Filter( Review comment: The ifdef and the decorations above each function should match, so I moved the endif accordingly. Technically these ExecNode methods compile with just the compute namespace, not dataset. I could move them so they compile even if the arrow C++ library wasn't built with ARROW_DATASET=ON, but that seemed not worth the effort. ########## File path: r/R/dataset-write.R ########## @@ -116,25 +116,40 @@ write_dataset <- function(dataset, if (inherits(dataset, "arrow_dplyr_query")) { # partitioning vars need to be in the `select` schema dataset <- ensure_group_vars(dataset) - } else if (inherits(dataset, "grouped_df")) { - force(partitioning) - # Drop the grouping metadata before writing; we've already consumed it - # now to construct `partitioning` and don't want it in the metadata$r - dataset <- dplyr::ungroup(dataset) + } else { + if (inherits(dataset, "grouped_df")) { + force(partitioning) + # Drop the grouping metadata before writing; we've already consumed it + # now to construct `partitioning` and don't want it in the metadata$r + dataset <- dplyr::ungroup(dataset) + } + dataset <- tryCatch( + as_adq(dataset), + error = function(e) { + stop("'dataset' must be a Dataset, RecordBatch, Table, arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE) + } + ) } - scanner <- Scanner$create(dataset) + plan <- ExecPlan$create() + final_node <- plan$Build(dataset) + # TODO: warn/error if there is sorting/top_k? or just compute? (this needs test) Review comment: Cool. TopK is a separate issue--it's another feature only handled in a sink node. I'll handle it here by evaluating the query and then doing a new ExecPlan to write the resulting Table. ########## File path: r/src/compute-exec.cpp ########## @@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan( arrow::dataset::ScanNodeOptions{dataset, options}); } -#endif +// [[dataset::export]] +void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, + const std::shared_ptr<compute::ExecNode>& final_node, + const std::shared_ptr<ds::FileWriteOptions>& file_write_options, + const std::shared_ptr<fs::FileSystem>& filesystem, + std::string base_dir, + const std::shared_ptr<ds::Partitioning>& partitioning, + std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, + int max_partitions) { + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + opts.max_partitions = max_partitions; + + MakeExecNodeOrStop( + "write", final_node->plan(), {final_node.get()}, + ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())}); + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); +} // [[dataset::export]] std::shared_ptr<compute::ExecNode> ExecNode_Filter( Review comment: Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason). ########## File path: r/src/compute-exec.cpp ########## @@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan( arrow::dataset::ScanNodeOptions{dataset, options}); } -#endif +// [[dataset::export]] +void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan, + const std::shared_ptr<compute::ExecNode>& final_node, + const std::shared_ptr<ds::FileWriteOptions>& file_write_options, + const std::shared_ptr<fs::FileSystem>& filesystem, + std::string base_dir, + const std::shared_ptr<ds::Partitioning>& partitioning, + std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, + int max_partitions) { + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + opts.max_partitions = max_partitions; + + MakeExecNodeOrStop( + "write", final_node->plan(), {final_node.get()}, + ds::WriteNodeOptions{std::move(opts), std::move(final_node->output_schema())}); + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); +} // [[dataset::export]] std::shared_ptr<compute::ExecNode> ExecNode_Filter( Review comment: Yes but we (currently) turn a Table into an InMemoryDataset. I guess we could go through RecordBatchReader, which wouldn't require dataset (I can't remember if I looked into that and found that it didn't work for some reason, you'd think it should). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org