jorisvandenbossche commented on a change in pull request #10955:
URL: https://github.com/apache/arrow/pull/10955#discussion_r717575581



##########
File path: cpp/src/arrow/dataset/dataset_writer.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/async_util.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the 
dataset
+/// write options (for example, min_rows_per_file or max_open_files)

Review comment:
       ```suggestion
   /// write options (for example, max_rows_per_file or max_open_files)
   ```

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <list>
+#include <mutex>
+#include <unordered_map>
+
+#include "arrow/filesystem/path_util.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/map.h"
+#include "arrow/util/string.h"
+
+namespace arrow {
+namespace dataset {
+
+namespace {
+
+constexpr util::string_view kIntegerToken = "{i}";
+
+class Throttle {
+ public:
+  explicit Throttle(uint64_t max_value) : max_value_(max_value) {}
+
+  bool Unthrottled() const { return max_value_ <= 0; }
+
+  Future<> Acquire(uint64_t values) {
+    if (Unthrottled()) {
+      return Future<>::MakeFinished();
+    }
+    std::lock_guard<std::mutex> lg(mutex_);
+    if (values + current_value_ > max_value_) {
+      in_waiting_ = values;
+      backpressure_ = Future<>::Make();
+    } else {
+      current_value_ += values;
+    }
+    return backpressure_;
+  }
+
+  void Release(uint64_t values) {
+    if (Unthrottled()) {
+      return;
+    }
+    Future<> to_complete;
+    {
+      std::lock_guard<std::mutex> lg(mutex_);
+      current_value_ -= values;
+      if (in_waiting_ > 0 && in_waiting_ + current_value_ <= max_value_) {
+        in_waiting_ = 0;
+        to_complete = backpressure_;
+      }
+    }
+    if (to_complete.is_valid()) {
+      to_complete.MarkFinished();
+    }
+  }
+
+ private:
+  Future<> backpressure_ = Future<>::MakeFinished();
+  uint64_t max_value_;
+  uint64_t in_waiting_ = 0;
+  uint64_t current_value_ = 0;
+  std::mutex mutex_;
+};
+
+class DatasetWriterFileQueue : public util::AsyncDestroyable {
+ public:
+  explicit DatasetWriterFileQueue(const Future<std::shared_ptr<FileWriter>>& 
writer_fut,
+                                  const FileSystemDatasetWriteOptions& options,
+                                  std::mutex* visitors_mutex)
+      : options_(options), visitors_mutex_(visitors_mutex) {
+    running_task_ = Future<>::Make();
+    writer_fut.AddCallback(
+        [this](const Result<std::shared_ptr<FileWriter>>& maybe_writer) {
+          if (maybe_writer.ok()) {
+            writer_ = *maybe_writer;
+            Flush();
+          } else {
+            Abort(maybe_writer.status());
+          }
+        });
+  }
+
+  Future<uint64_t> Push(std::shared_ptr<RecordBatch> batch) {
+    std::unique_lock<std::mutex> lk(mutex);
+    write_queue_.push_back(std::move(batch));
+    Future<uint64_t> write_future = Future<uint64_t>::Make();
+    write_futures_.push_back(write_future);
+    if (!running_task_.is_valid()) {
+      running_task_ = Future<>::Make();
+      FlushUnlocked(std::move(lk));
+    }
+    return write_future;
+  }
+
+  Future<> DoDestroy() override {
+    std::lock_guard<std::mutex> lg(mutex);
+    if (!running_task_.is_valid()) {
+      RETURN_NOT_OK(DoFinish());
+      return Future<>::MakeFinished();
+    }
+    return running_task_.Then([this] { return DoFinish(); });
+  }
+
+ private:
+  Future<uint64_t> WriteNext() {
+    // May want to prototype / measure someday pushing the async write down 
further
+    return DeferNotOk(
+        io::default_io_context().executor()->Submit([this]() -> 
Result<uint64_t> {
+          DCHECK(running_task_.is_valid());
+          std::unique_lock<std::mutex> lk(mutex);
+          const std::shared_ptr<RecordBatch>& to_write = write_queue_.front();
+          Future<uint64_t> on_complete = write_futures_.front();
+          uint64_t rows_to_write = to_write->num_rows();
+          lk.unlock();
+          Status status = writer_->Write(to_write);
+          lk.lock();
+          write_queue_.pop_front();
+          write_futures_.pop_front();
+          lk.unlock();
+          if (!status.ok()) {
+            on_complete.MarkFinished(status);
+          } else {
+            on_complete.MarkFinished(rows_to_write);
+          }
+          return rows_to_write;
+        }));
+  }
+
+  Status DoFinish() {
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      RETURN_NOT_OK(options_.writer_pre_finish(writer_.get()));
+    }
+    RETURN_NOT_OK(writer_->Finish());
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      return options_.writer_post_finish(writer_.get());
+    }
+  }
+
+  void Abort(Status err) {
+    std::vector<Future<uint64_t>> futures_to_abort;
+    Future<> old_running_task = running_task_;
+    {
+      std::lock_guard<std::mutex> lg(mutex);
+      write_queue_.clear();
+      futures_to_abort =
+          std::vector<Future<uint64_t>>(write_futures_.begin(), 
write_futures_.end());
+      write_futures_.clear();
+      running_task_ = Future<>();
+    }
+    for (auto& fut : futures_to_abort) {
+      fut.MarkFinished(err);
+    }
+    old_running_task.MarkFinished(std::move(err));
+  }
+
+  void Flush() {
+    std::unique_lock<std::mutex> lk(mutex);
+    FlushUnlocked(std::move(lk));
+  }
+
+  void FlushUnlocked(std::unique_lock<std::mutex> lk) {
+    if (write_queue_.empty()) {
+      Future<> old_running_task = running_task_;
+      running_task_ = Future<>();
+      lk.unlock();
+      old_running_task.MarkFinished();
+      return;
+    }
+    WriteNext().AddCallback([this](const Result<uint64_t>& res) {
+      if (res.ok()) {
+        Flush();
+      } else {
+        Abort(res.status());
+      }
+    });
+  }
+
+  const FileSystemDatasetWriteOptions& options_;
+  std::mutex* visitors_mutex_;
+  std::shared_ptr<FileWriter> writer_;
+  std::mutex mutex;
+  std::list<std::shared_ptr<RecordBatch>> write_queue_;
+  std::list<Future<uint64_t>> write_futures_;
+  Future<> running_task_;
+};
+
+struct WriteTask {
+  std::string filename;
+  uint64_t num_rows;
+};
+
+class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
+ public:
+  DatasetWriterDirectoryQueue(std::string directory, std::shared_ptr<Schema> 
schema,
+                              const FileSystemDatasetWriteOptions& 
write_options,
+                              Throttle* open_files_throttle, std::mutex* 
visitors_mutex)
+      : directory_(std::move(directory)),
+        schema_(std::move(schema)),
+        write_options_(write_options),
+        open_files_throttle_(open_files_throttle),
+        visitors_mutex_(visitors_mutex) {}
+
+  Result<std::shared_ptr<RecordBatch>> NextWritableChunk(
+      std::shared_ptr<RecordBatch> batch, std::shared_ptr<RecordBatch>* 
remainder,
+      bool* will_open_file) const {
+    DCHECK_GT(batch->num_rows(), 0);
+    uint64_t rows_available = std::numeric_limits<uint64_t>::max();
+    *will_open_file = rows_written_ == 0;
+    if (write_options_.max_rows_per_file > 0) {
+      rows_available = write_options_.max_rows_per_file - rows_written_;
+    }
+
+    std::shared_ptr<RecordBatch> to_queue;
+    if (rows_available < static_cast<uint64_t>(batch->num_rows())) {
+      to_queue = batch->Slice(0, static_cast<int64_t>(rows_available));
+      *remainder = batch->Slice(static_cast<int64_t>(rows_available));
+    } else {
+      to_queue = std::move(batch);
+    }
+    return to_queue;
+  }
+
+  Future<WriteTask> StartWrite(const std::shared_ptr<RecordBatch>& batch) {
+    rows_written_ += batch->num_rows();
+    WriteTask task{current_filename_, 
static_cast<uint64_t>(batch->num_rows())};
+    if (!latest_open_file_) {
+      ARROW_ASSIGN_OR_RAISE(latest_open_file_, 
OpenFileQueue(current_filename_));
+    }
+    return latest_open_file_->Push(batch).Then([task] { return task; });
+  }
+
+  Result<std::string> GetNextFilename() {
+    auto basename = ::arrow::internal::Replace(
+        write_options_.basename_template, kIntegerToken, 
std::to_string(file_counter_++));
+    if (!basename) {
+      return Status::Invalid("string interpolation of basename template 
failed");
+    }
+
+    return fs::internal::ConcatAbstractPath(directory_, *basename);
+  }
+
+  Status FinishCurrentFile() {
+    if (latest_open_file_) {
+      latest_open_file_ = nullptr;
+    }
+    rows_written_ = 0;
+    return GetNextFilename().Value(&current_filename_);
+  }
+
+  Result<std::shared_ptr<FileWriter>> OpenWriter(const std::string& filename) {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<io::OutputStream> out_stream,
+                          
write_options_.filesystem->OpenOutputStream(filename));
+    return write_options_.format()->MakeWriter(std::move(out_stream), schema_,
+                                               
write_options_.file_write_options,
+                                               {write_options_.filesystem, 
filename});
+  }
+
+  Result<std::shared_ptr<DatasetWriterFileQueue>> OpenFileQueue(
+      const std::string& filename) {
+    Future<std::shared_ptr<FileWriter>> file_writer_fut =
+        init_future_.Then([this, filename] {
+          ::arrow::internal::Executor* io_executor =
+              write_options_.filesystem->io_context().executor();
+          return DeferNotOk(
+              io_executor->Submit([this, filename]() { return 
OpenWriter(filename); }));
+        });
+    auto file_queue = util::MakeSharedAsync<DatasetWriterFileQueue>(
+        file_writer_fut, write_options_, visitors_mutex_);
+    RETURN_NOT_OK(task_group_.AddTask(
+        file_queue->on_closed().Then([this] { 
open_files_throttle_->Release(1); })));
+    return file_queue;
+  }
+
+  uint64_t rows_written() const { return rows_written_; }
+
+  void PrepareDirectory() {
+    init_future_ =
+        
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
+          RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_));
+          if (write_options_.existing_data_behavior == 
kDeleteMatchingPartitions) {
+            fs::FileSelector selector;
+            selector.base_dir = directory_;
+            selector.recursive = true;
+            return write_options_.filesystem->DeleteFiles(selector);
+          }
+          return Status::OK();
+        }));
+  }
+
+  static Result<std::unique_ptr<DatasetWriterDirectoryQueue,
+                                
util::DestroyingDeleter<DatasetWriterDirectoryQueue>>>
+  Make(util::AsyncTaskGroup* task_group,
+       const FileSystemDatasetWriteOptions& write_options, Throttle* 
open_files_throttle,
+       std::shared_ptr<Schema> schema, std::string dir, std::mutex* 
visitors_mutex) {
+    auto dir_queue = util::MakeUniqueAsync<DatasetWriterDirectoryQueue>(
+        std::move(dir), std::move(schema), write_options, open_files_throttle,
+        visitors_mutex);
+    RETURN_NOT_OK(task_group->AddTask(dir_queue->on_closed()));
+    dir_queue->PrepareDirectory();
+    ARROW_ASSIGN_OR_RAISE(dir_queue->current_filename_, 
dir_queue->GetNextFilename());
+    // std::move required to make RTools 3.5 mingw compiler happy
+    return std::move(dir_queue);
+  }
+
+  Future<> DoDestroy() override {
+    latest_open_file_.reset();
+    return task_group_.WaitForTasksToFinish();
+  }
+
+ private:
+  util::AsyncTaskGroup task_group_;
+  std::string directory_;
+  std::shared_ptr<Schema> schema_;
+  const FileSystemDatasetWriteOptions& write_options_;
+  Throttle* open_files_throttle_;
+  std::mutex* visitors_mutex_;
+  Future<> init_future_;
+  std::string current_filename_;
+  std::shared_ptr<DatasetWriterFileQueue> latest_open_file_;
+  uint64_t rows_written_ = 0;
+  uint32_t file_counter_ = 0;
+};
+
+Status ValidateBasenameTemplate(util::string_view basename_template) {
+  if (basename_template.find(fs::internal::kSep) != util::string_view::npos) {
+    return Status::Invalid("basename_template contained '/'");
+  }
+  size_t token_start = basename_template.find(kIntegerToken);
+  if (token_start == util::string_view::npos) {
+    return Status::Invalid("basename_template did not contain '", 
kIntegerToken, "'");
+  }
+  size_t next_token_start = basename_template.find(kIntegerToken, token_start 
+ 1);
+  if (next_token_start != util::string_view::npos) {
+    return Status::Invalid("basename_template contained '", kIntegerToken,
+                           "' more than once");
+  }
+  return Status::OK();
+}
+
+Status EnsureDestinationValid(const FileSystemDatasetWriteOptions& options) {
+  if (options.existing_data_behavior == kError) {
+    fs::FileSelector selector;
+    selector.base_dir = options.base_dir;
+    selector.recursive = true;
+    Result<std::vector<fs::FileInfo>> maybe_files =
+        options.filesystem->GetFileInfo(selector);
+    if (!maybe_files.ok()) {
+      // If the path doesn't exist then continue
+      return Status::OK();
+    }
+    if (maybe_files->size() > 1) {
+      return Status::Invalid(
+          "Could not write to ", options.base_dir,
+          " as the directory is not empty and existing_data_behavior is 
kError");

Review comment:
       ```suggestion
             " as the directory is not empty and existing_data_behavior is to 
error");
   ```
   
   (or something alike, to make the error message more generic; `kError` will 
be a bit strange for eg python/R users)

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -343,6 +343,18 @@ class ARROW_DS_EXPORT FileWriter {
   fs::FileLocator destination_locator_;
 };
 
+/// \brief Controls what happens if files exist in an output directory during 
a dataset
+/// write
+enum ExistingDataBehavior : int8_t {
+  /// Deletes all files in a directory the first time that directory is 
encountered
+  kDeleteMatchingPartitions,
+  /// Ignores existing files, overwriting any that happen to have the same 
name as an
+  /// output file
+  kOverwriteOrIgnore,
+  /// Returns an error if there are any files or subdirectories in the output 
directory
+  kError,

Review comment:
       Is this about the whole root dir? Or only the (sub)directory where 
actually is being written to? 
   
   (I am thinking about the use case of a "strict" append (only adding new 
partitions, eg adding new days for dataset partitioned by year/month/day), 
where you want to error for writing to a subdirectory that already exists, but 
not necessarily if some files/directories are already present in the root 
directory)

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <list>
+#include <mutex>
+#include <unordered_map>
+
+#include "arrow/filesystem/path_util.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/map.h"
+#include "arrow/util/string.h"
+
+namespace arrow {
+namespace dataset {
+
+namespace {
+
+constexpr util::string_view kIntegerToken = "{i}";
+
+class Throttle {
+ public:
+  explicit Throttle(uint64_t max_value) : max_value_(max_value) {}
+
+  bool Unthrottled() const { return max_value_ <= 0; }
+
+  Future<> Acquire(uint64_t values) {
+    if (Unthrottled()) {
+      return Future<>::MakeFinished();
+    }
+    std::lock_guard<std::mutex> lg(mutex_);
+    if (values + current_value_ > max_value_) {
+      in_waiting_ = values;
+      backpressure_ = Future<>::Make();
+    } else {
+      current_value_ += values;
+    }
+    return backpressure_;
+  }
+
+  void Release(uint64_t values) {
+    if (Unthrottled()) {
+      return;
+    }
+    Future<> to_complete;
+    {
+      std::lock_guard<std::mutex> lg(mutex_);
+      current_value_ -= values;
+      if (in_waiting_ > 0 && in_waiting_ + current_value_ <= max_value_) {
+        in_waiting_ = 0;
+        to_complete = backpressure_;
+      }
+    }
+    if (to_complete.is_valid()) {
+      to_complete.MarkFinished();
+    }
+  }
+
+ private:
+  Future<> backpressure_ = Future<>::MakeFinished();
+  uint64_t max_value_;
+  uint64_t in_waiting_ = 0;
+  uint64_t current_value_ = 0;
+  std::mutex mutex_;
+};
+
+class DatasetWriterFileQueue : public util::AsyncDestroyable {
+ public:
+  explicit DatasetWriterFileQueue(const Future<std::shared_ptr<FileWriter>>& 
writer_fut,
+                                  const FileSystemDatasetWriteOptions& options,
+                                  std::mutex* visitors_mutex)
+      : options_(options), visitors_mutex_(visitors_mutex) {
+    running_task_ = Future<>::Make();
+    writer_fut.AddCallback(
+        [this](const Result<std::shared_ptr<FileWriter>>& maybe_writer) {
+          if (maybe_writer.ok()) {
+            writer_ = *maybe_writer;
+            Flush();
+          } else {
+            Abort(maybe_writer.status());
+          }
+        });
+  }
+
+  Future<uint64_t> Push(std::shared_ptr<RecordBatch> batch) {
+    std::unique_lock<std::mutex> lk(mutex);
+    write_queue_.push_back(std::move(batch));
+    Future<uint64_t> write_future = Future<uint64_t>::Make();
+    write_futures_.push_back(write_future);
+    if (!running_task_.is_valid()) {
+      running_task_ = Future<>::Make();
+      FlushUnlocked(std::move(lk));
+    }
+    return write_future;
+  }
+
+  Future<> DoDestroy() override {
+    std::lock_guard<std::mutex> lg(mutex);
+    if (!running_task_.is_valid()) {
+      RETURN_NOT_OK(DoFinish());
+      return Future<>::MakeFinished();
+    }
+    return running_task_.Then([this] { return DoFinish(); });
+  }
+
+ private:
+  Future<uint64_t> WriteNext() {
+    // May want to prototype / measure someday pushing the async write down 
further
+    return DeferNotOk(
+        io::default_io_context().executor()->Submit([this]() -> 
Result<uint64_t> {
+          DCHECK(running_task_.is_valid());
+          std::unique_lock<std::mutex> lk(mutex);
+          const std::shared_ptr<RecordBatch>& to_write = write_queue_.front();
+          Future<uint64_t> on_complete = write_futures_.front();
+          uint64_t rows_to_write = to_write->num_rows();
+          lk.unlock();
+          Status status = writer_->Write(to_write);
+          lk.lock();
+          write_queue_.pop_front();
+          write_futures_.pop_front();
+          lk.unlock();
+          if (!status.ok()) {
+            on_complete.MarkFinished(status);
+          } else {
+            on_complete.MarkFinished(rows_to_write);
+          }
+          return rows_to_write;
+        }));
+  }
+
+  Status DoFinish() {
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      RETURN_NOT_OK(options_.writer_pre_finish(writer_.get()));
+    }
+    RETURN_NOT_OK(writer_->Finish());
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      return options_.writer_post_finish(writer_.get());
+    }
+  }
+
+  void Abort(Status err) {
+    std::vector<Future<uint64_t>> futures_to_abort;
+    Future<> old_running_task = running_task_;
+    {
+      std::lock_guard<std::mutex> lg(mutex);
+      write_queue_.clear();
+      futures_to_abort =
+          std::vector<Future<uint64_t>>(write_futures_.begin(), 
write_futures_.end());
+      write_futures_.clear();
+      running_task_ = Future<>();
+    }
+    for (auto& fut : futures_to_abort) {
+      fut.MarkFinished(err);
+    }
+    old_running_task.MarkFinished(std::move(err));
+  }
+
+  void Flush() {
+    std::unique_lock<std::mutex> lk(mutex);
+    FlushUnlocked(std::move(lk));
+  }
+
+  void FlushUnlocked(std::unique_lock<std::mutex> lk) {
+    if (write_queue_.empty()) {
+      Future<> old_running_task = running_task_;
+      running_task_ = Future<>();
+      lk.unlock();
+      old_running_task.MarkFinished();
+      return;
+    }
+    WriteNext().AddCallback([this](const Result<uint64_t>& res) {
+      if (res.ok()) {
+        Flush();
+      } else {
+        Abort(res.status());
+      }
+    });
+  }
+
+  const FileSystemDatasetWriteOptions& options_;
+  std::mutex* visitors_mutex_;
+  std::shared_ptr<FileWriter> writer_;
+  std::mutex mutex;
+  std::list<std::shared_ptr<RecordBatch>> write_queue_;
+  std::list<Future<uint64_t>> write_futures_;
+  Future<> running_task_;
+};
+
+struct WriteTask {
+  std::string filename;
+  uint64_t num_rows;
+};
+
+class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
+ public:
+  DatasetWriterDirectoryQueue(std::string directory, std::shared_ptr<Schema> 
schema,
+                              const FileSystemDatasetWriteOptions& 
write_options,
+                              Throttle* open_files_throttle, std::mutex* 
visitors_mutex)
+      : directory_(std::move(directory)),
+        schema_(std::move(schema)),
+        write_options_(write_options),
+        open_files_throttle_(open_files_throttle),
+        visitors_mutex_(visitors_mutex) {}
+
+  Result<std::shared_ptr<RecordBatch>> NextWritableChunk(
+      std::shared_ptr<RecordBatch> batch, std::shared_ptr<RecordBatch>* 
remainder,
+      bool* will_open_file) const {
+    DCHECK_GT(batch->num_rows(), 0);
+    uint64_t rows_available = std::numeric_limits<uint64_t>::max();
+    *will_open_file = rows_written_ == 0;
+    if (write_options_.max_rows_per_file > 0) {
+      rows_available = write_options_.max_rows_per_file - rows_written_;
+    }
+
+    std::shared_ptr<RecordBatch> to_queue;
+    if (rows_available < static_cast<uint64_t>(batch->num_rows())) {
+      to_queue = batch->Slice(0, static_cast<int64_t>(rows_available));
+      *remainder = batch->Slice(static_cast<int64_t>(rows_available));
+    } else {
+      to_queue = std::move(batch);
+    }
+    return to_queue;
+  }
+
+  Future<WriteTask> StartWrite(const std::shared_ptr<RecordBatch>& batch) {
+    rows_written_ += batch->num_rows();
+    WriteTask task{current_filename_, 
static_cast<uint64_t>(batch->num_rows())};
+    if (!latest_open_file_) {
+      ARROW_ASSIGN_OR_RAISE(latest_open_file_, 
OpenFileQueue(current_filename_));
+    }
+    return latest_open_file_->Push(batch).Then([task] { return task; });
+  }
+
+  Result<std::string> GetNextFilename() {
+    auto basename = ::arrow::internal::Replace(
+        write_options_.basename_template, kIntegerToken, 
std::to_string(file_counter_++));
+    if (!basename) {
+      return Status::Invalid("string interpolation of basename template 
failed");
+    }
+
+    return fs::internal::ConcatAbstractPath(directory_, *basename);
+  }
+
+  Status FinishCurrentFile() {
+    if (latest_open_file_) {
+      latest_open_file_ = nullptr;
+    }
+    rows_written_ = 0;
+    return GetNextFilename().Value(&current_filename_);
+  }
+
+  Result<std::shared_ptr<FileWriter>> OpenWriter(const std::string& filename) {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<io::OutputStream> out_stream,
+                          
write_options_.filesystem->OpenOutputStream(filename));
+    return write_options_.format()->MakeWriter(std::move(out_stream), schema_,
+                                               
write_options_.file_write_options,
+                                               {write_options_.filesystem, 
filename});
+  }
+
+  Result<std::shared_ptr<DatasetWriterFileQueue>> OpenFileQueue(
+      const std::string& filename) {
+    Future<std::shared_ptr<FileWriter>> file_writer_fut =
+        init_future_.Then([this, filename] {
+          ::arrow::internal::Executor* io_executor =
+              write_options_.filesystem->io_context().executor();
+          return DeferNotOk(
+              io_executor->Submit([this, filename]() { return 
OpenWriter(filename); }));
+        });
+    auto file_queue = util::MakeSharedAsync<DatasetWriterFileQueue>(
+        file_writer_fut, write_options_, visitors_mutex_);
+    RETURN_NOT_OK(task_group_.AddTask(
+        file_queue->on_closed().Then([this] { 
open_files_throttle_->Release(1); })));
+    return file_queue;
+  }
+
+  uint64_t rows_written() const { return rows_written_; }
+
+  void PrepareDirectory() {
+    init_future_ =
+        
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
+          RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_));
+          if (write_options_.existing_data_behavior == 
kDeleteMatchingPartitions) {
+            fs::FileSelector selector;
+            selector.base_dir = directory_;
+            selector.recursive = true;
+            return write_options_.filesystem->DeleteFiles(selector);

Review comment:
       There is a FileSystem::DeleteDirContents, which might be doing the 
equivalent? (I don't know if that can be more efficient, or is basically doing 
the same)

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3295,8 +3295,8 @@ def test_write_dataset_with_scanner(tempdir):
     dataset = ds.dataset(tempdir, partitioning=["b"])
 
     with tempfile.TemporaryDirectory() as tempdir2:
-        ds.write_dataset(dataset.scanner(columns=["b", "c"]), tempdir2,
-                         format='parquet', partitioning=["b"])
+        ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True),

Review comment:
       What happens if you pass an sync scanner?

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -364,6 +376,18 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// {i} will be replaced by an auto incremented integer.
   std::string basename_template;
 
+  /// If greater than 0 then this will limit the maximum number of files that 
can be left
+  /// open. If an attempt is made to open too many files then the least 
recently used file
+  /// will be closed.  If this setting is set too low you may end up 
fragmenting your data
+  /// into many small files.
+  uint32_t max_open_files = 1024;
+
+  /// If greater than 0 then this will limit how many rows are placed in any 
single file.

Review comment:
       Maybe this can clarify how this is determined if not set (i.e. equal to 
0). Are all rows that will be written to a certain directory then always put in 
a single file?

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";
+    write_options_.base_dir = "testdir";
+    write_options_.writer_pre_finish = [this](FileWriter* writer) {
+      pre_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    write_options_.writer_post_finish = [this](FileWriter* writer) {
+      post_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>();
+    write_options_.file_write_options = format->DefaultWriteOptions();
+  }
+
+  std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now);
+    ARROW_EXPECT_OK(fs->CreateDir("testdir"));
+    write_options_.filesystem = fs;
+    filesystem_ = fs;
+    return fs;
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) {
+    Int64Builder builder;
+    for (uint64_t i = 0; i < num_rows; i++) {
+      ARROW_EXPECT_OK(builder.Append(i + start));
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish());
+    return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), 
{std::move(arr)});
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) {
+    std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows);
+    counter_ += num_rows;
+    return batch;
+  }
+
+  util::optional<MockFileInfo> FindFile(const std::string& filename) {
+    for (const auto& mock_file : filesystem_->AllFiles()) {
+      if (mock_file.full_path == filename) {
+        return mock_file;
+      }
+    }
+    return util::nullopt;
+  }
+
+  void AssertVisited(const std::vector<std::string>& actual_paths,
+                     const std::string& expected_path) {
+    std::vector<std::string>::const_iterator found =
+        std::find(actual_paths.begin(), actual_paths.end(), expected_path);
+    ASSERT_NE(found, actual_paths.end())
+        << "The file " << expected_path << " was not in the list of files 
visited";
+  }
+
+  std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) {
+    std::shared_ptr<io::RandomAccessFile> in_stream =
+        std::make_shared<io::BufferReader>(data);
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader,
+                         
ipc::RecordBatchFileReader::Open(std::move(in_stream)));
+    RecordBatchVector batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch,
+                           reader->ReadRecordBatch(i));
+      batches.push_back(next_batch);
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, 
Table::FromRecordBatches(batches));
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, 
table->CombineChunks());
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch,
+                         TableBatchReader(*combined_table).Next());
+    return batch;
+  }
+
+  void AssertFiles(const std::vector<ExpectedFile>& expected_files,
+                   bool verify_content = true) {
+    counter_ = 0;
+    for (const auto& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = 
FindFile(expected_file.filename);
+      ASSERT_TRUE(written_file.has_value())
+          << "The file " << expected_file.filename << " was not created";
+      {
+        SCOPED_TRACE("pre_finish");
+        AssertVisited(pre_finish_visited_, expected_file.filename);
+      }
+      {
+        SCOPED_TRACE("post_finish");
+        AssertVisited(post_finish_visited_, expected_file.filename);
+      }
+      if (verify_content) {
+        AssertBatchesEqual(*MakeBatch(expected_file.start, 
expected_file.num_rows),
+                           *ReadAsBatch(written_file->data));
+      }
+    }
+  }
+
+  void AssertNotFiles(const std::vector<std::string>& expected_non_files) {
+    for (const auto& expected_non_file : expected_non_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_non_file);
+      ASSERT_FALSE(file.has_value());
+    }
+  }
+
+  void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) {
+    for (const auto& expected_empty_file : expected_empty_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_empty_file);
+      ASSERT_TRUE(file.has_value());
+      ASSERT_EQ("", file->data);
+    }
+  }
+
+  std::shared_ptr<MockFileSystem> filesystem_;
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::string> pre_finish_visited_;
+  std::vector<std::string> post_finish_visited_;
+  FileSystemDatasetWriteOptions write_options_;
+  uint64_t counter_ = 0;
+};
+
+TEST_F(DatasetWriterTestFixture, Basic) {
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 10},
+               {"testdir/part-1.arrow", 10, 10},
+               {"testdir/part-2.arrow", 20, 10},
+               {"testdir/part-3.arrow", 30, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 10, 
8}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
+  auto gated_fs = UseGatedFs();
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < 10; i++) {
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "");
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(1));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(1));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
+  // NBATCHES must be less than I/O executor concurrency to avoid deadlock / 
test failure
+  constexpr int NBATCHES = 6;
+  auto gated_fs = UseGatedFs();
+  std::vector<ExpectedFile> expected_files;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < NBATCHES; i++) {
+    std::string i_str = std::to_string(i);
+    expected_files.push_back(ExpectedFile{"testdir/part" + i_str + 
"/part-0.arrow",
+                                          static_cast<uint64_t>(i) * 10, 10});
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), 
"part" + i_str);
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(NBATCHES));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(NBATCHES));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles(expected_files);
+}
+
+TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
+  auto gated_fs = UseGatedFs();
+  write_options_.max_open_files = 2;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  Future<> fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "part2");
+  // Backpressure will be applied until an existing file can be evicted
+  AssertNotFinished(fut);
+
+  // Ungate the writes to relieve the pressure, testdir/part0 should be closed
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(2));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(5));
+  ASSERT_FINISHES_OK(fut);
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  // Following call should resume existing write but, on slow test systems, 
the old
+  // write may have already been finished
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part0/part-0.arrow", 0, 10},
+               {"testdir/part0/part-0.arrow", 20, 10},

Review comment:
       How can it test that two identical files have different content?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3295,8 +3295,8 @@ def test_write_dataset_with_scanner(tempdir):
     dataset = ds.dataset(tempdir, partitioning=["b"])
 
     with tempfile.TemporaryDirectory() as tempdir2:
-        ds.write_dataset(dataset.scanner(columns=["b", "c"]), tempdir2,
-                         format='parquet', partitioning=["b"])
+        ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True),

Review comment:
       Quickly tried it out and you get `ArrowNotImplementedError: Asynchronous 
scanning is not supported by SyncScanner`, so that seems fine

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3656,16 +3650,12 @@ def file_visitor(written_file):
 
     # Since it is a multi-threaded write there is no way to know which
     # directory gets part-0 and which gets part-1
-    expected_paths_a = {
+    expected_paths = {

Review comment:
       Comment can be removed here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to