bkietz commented on a change in pull request #7869:
URL: https://github.com/apache/arrow/pull/7869#discussion_r465259555



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -154,52 +157,101 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
-    const WritePlan& plan, std::shared_ptr<ScanOptions> scan_options,
-    std::shared_ptr<ScanContext> scan_context) {
-  auto filesystem = plan.filesystem;
-  if (filesystem == nullptr) {
-    filesystem = std::make_shared<fs::LocalFileSystem>();
-  }
+struct WriteTask {
+  Status Execute();
 
-  auto task_group = scan_context->TaskGroup();
-  auto partition_base_dir = 
fs::internal::EnsureTrailingSlash(plan.partition_base_dir);
-  auto extension = "." + plan.format->type_name();
-
-  std::vector<std::shared_ptr<FileFragment>> fragments;
-  for (size_t i = 0; i < plan.paths.size(); ++i) {
-    const auto& op = plan.fragment_or_partition_expressions[i];
-    if (op.kind() == WritePlan::FragmentOrPartitionExpression::FRAGMENT) {
-      auto path = partition_base_dir + plan.paths[i] + extension;
-
-      const auto& input_fragment = op.fragment();
-      FileSource dest(path, filesystem);
-
-      ARROW_ASSIGN_OR_RAISE(auto write_task,
-                            plan.format->WriteFragment({path, filesystem}, 
input_fragment,
-                                                       scan_options, 
scan_context));
-      task_group->Append([write_task] { return write_task->Execute(); });
-
-      ARROW_ASSIGN_OR_RAISE(
-          auto fragment, plan.format->MakeFragment(
-                             {path, filesystem}, 
input_fragment->partition_expression()));
-      fragments.push_back(std::move(fragment));
+  std::string basename;
+
+  /// The partitioning with which paths will be generated
+  std::shared_ptr<Partitioning> partitioning;
+
+  /// The format in which fragments will be written
+  std::shared_ptr<FileFormat> format;
+
+  /// The FileSystem and base directory into which fragments will be written
+  std::shared_ptr<fs::FileSystem> filesystem;
+  std::string base_dir;
+
+  /// Batches to be written
+  std::shared_ptr<RecordBatchReader> batches;
+
+  /// An Expression already satisfied by every batch to be written
+  std::shared_ptr<Expression> partition_expression;

Review comment:
       This stores partition information which was already attached at the 
fragment level, and therefore may reference virtual columns absent from 
`batches`. Without this we'd need to materialize those columns then waste time 
Grouping their (identical) entries, finally dropping them in the common case 
where we don't write them to disk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to