nealrichardson commented on a change in pull request #12316:
URL: https://github.com/apache/arrow/pull/12316#discussion_r806275298



##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& 
file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior 
existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), 
std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       The ifdef and the decorations above each function should match, so I 
moved the endif accordingly. Technically these ExecNode methods compile with 
just the compute namespace, not dataset. I could move them so they compile even 
if the arrow C++ library wasn't built with ARROW_DATASET=ON, but that seemed 
not worth the effort. 

##########
File path: r/R/dataset-write.R
##########
@@ -116,25 +116,40 @@ write_dataset <- function(dataset,
   if (inherits(dataset, "arrow_dplyr_query")) {
     # partitioning vars need to be in the `select` schema
     dataset <- ensure_group_vars(dataset)
-  } else if (inherits(dataset, "grouped_df")) {
-    force(partitioning)
-    # Drop the grouping metadata before writing; we've already consumed it
-    # now to construct `partitioning` and don't want it in the metadata$r
-    dataset <- dplyr::ungroup(dataset)
+  } else {
+    if (inherits(dataset, "grouped_df")) {
+      force(partitioning)
+      # Drop the grouping metadata before writing; we've already consumed it
+      # now to construct `partitioning` and don't want it in the metadata$r
+      dataset <- dplyr::ungroup(dataset)
+    }
+    dataset <- tryCatch(
+      as_adq(dataset),
+      error = function(e) {
+        stop("'dataset' must be a Dataset, RecordBatch, Table, 
arrow_dplyr_query, or data.frame, not ", deparse(class(dataset)), call. = FALSE)
+      }
+    )
   }
 
-  scanner <- Scanner$create(dataset)
+  plan <- ExecPlan$create()
+  final_node <- plan$Build(dataset)
+  # TODO: warn/error if there is sorting/top_k? or just compute? (this needs 
test)

Review comment:
       Cool. TopK is a separate issue--it's another feature only handled in a 
sink node. I'll handle it here by evaluating the query and then doing a new 
ExecPlan to write the resulting Table. 

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& 
file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior 
existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), 
std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we 
could go through RecordBatchReader, which wouldn't require dataset (I can't 
remember if I looked into that and found that it didn't work for some reason).

##########
File path: r/src/compute-exec.cpp
##########
@@ -157,7 +158,33 @@ std::shared_ptr<compute::ExecNode> ExecNode_Scan(
                             arrow::dataset::ScanNodeOptions{dataset, options});
 }
 
-#endif
+// [[dataset::export]]
+void ExecPlan_Write(const std::shared_ptr<compute::ExecPlan>& plan,
+                    const std::shared_ptr<compute::ExecNode>& final_node,
+                    const std::shared_ptr<ds::FileWriteOptions>& 
file_write_options,
+                    const std::shared_ptr<fs::FileSystem>& filesystem,
+                    std::string base_dir,
+                    const std::shared_ptr<ds::Partitioning>& partitioning,
+                    std::string basename_template,
+                    arrow::dataset::ExistingDataBehavior 
existing_data_behavior,
+                    int max_partitions) {
+  ds::FileSystemDatasetWriteOptions opts;
+  opts.file_write_options = file_write_options;
+  opts.existing_data_behavior = existing_data_behavior;
+  opts.filesystem = filesystem;
+  opts.base_dir = base_dir;
+  opts.partitioning = partitioning;
+  opts.basename_template = basename_template;
+  opts.max_partitions = max_partitions;
+
+  MakeExecNodeOrStop(
+      "write", final_node->plan(), {final_node.get()},
+      ds::WriteNodeOptions{std::move(opts), 
std::move(final_node->output_schema())});
+
+  StopIfNotOk(plan->Validate());
+  StopIfNotOk(plan->StartProducing());
+  StopIfNotOk(plan->finished().status());
+}
 
 // [[dataset::export]]
 std::shared_ptr<compute::ExecNode> ExecNode_Filter(

Review comment:
       Yes but we (currently) turn a Table into an InMemoryDataset. I guess we 
could go through RecordBatchReader, which wouldn't require dataset (I can't 
remember if I looked into that and found that it didn't work for some reason, 
you'd think it should).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to