pitrou commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r499803394
##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +150,235 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
return MakeVectorIterator(std::move(fragments));
}
-struct WriteTask {
- Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, batches->Next());
+ if (batch == nullptr) break;
+ RETURN_NOT_OK(Write(batch));
+ }
+ return Status::OK();
+}
- /// The basename of files written by this WriteTask. Extensions
- /// are derived from format
- std::string basename;
+constexpr util::string_view kIntegerToken = "{i}";
- /// The partitioning with which paths will be generated
- std::shared_ptr<Partitioning> partitioning;
+Status ValidateBasenameTemplate(util::string_view basename_template) {
+ if (basename_template.find(fs::internal::kSep) != util::string_view::npos) {
+ return Status::Invalid("basename_template contained '/'");
+ }
+ size_t token_start = basename_template.find(kIntegerToken);
+ if (token_start == util::string_view::npos) {
+ return Status::Invalid("basename_template did not contain '",
kIntegerToken, "'");
+ }
+ return Status::OK();
+}
- /// The format in which fragments will be written
- std::shared_ptr<FileFormat> format;
+/// WriteQueue allows batches to be pushed from multiple threads while another
thread
+/// flushes some to disk.
+class WriteQueue {
+ public:
+ WriteQueue(std::string partition_expression, size_t index,
+ util::Mutex::Guard* wait_for_opened_writer_lock)
+ : partition_expression_(std::move(partition_expression)), index_(index) {
+ *wait_for_opened_writer_lock = writer_mutex_.Lock();
+ }
- /// The FileSystem and base directory into which fragments will be written
- std::shared_ptr<fs::FileSystem> filesystem;
- std::string base_dir;
+ // Push a batch into the writer's queue of pending writes.
+ void Push(std::shared_ptr<RecordBatch> batch) {
+ auto push_lock = push_mutex_.Lock();
+ pending_.push_back(std::move(batch));
+ }
- /// Batches to be written
- std::shared_ptr<RecordBatchReader> batches;
+ // Flush all pending batches, or return immediately if another thread is
already
+ // flushing this queue.
+ Status Flush() {
+ if (auto writer_lock = writer_mutex_.TryLock()) {
+ while (true) {
+ std::shared_ptr<RecordBatch> batch;
+ {
+ auto push_lock = push_mutex_.Lock();
+ if (pending_.empty()) {
+ // Ensure the writer_lock is released before the push_lock.
Otherwise another
+ // thread might successfully Push() a batch but then fail to
Flush() it since
+ // the writer_lock is still held, leaving an unflushed batch in
pending_.
+ writer_lock.Unlock();
+ break;
+ }
+ batch = std::move(pending_.front());
+ pending_.pop_front();
+ }
+ RETURN_NOT_OK(writer_->Write(batch));
+ }
+ }
+ return Status::OK();
+ }
- /// An Expression already satisfied by every batch to be written
- std::shared_ptr<Expression> partition_expression;
-};
+ const std::shared_ptr<FileWriter>& writer() const { return writer_; }
-Status WriteTask::Execute() {
- std::unordered_map<std::string, RecordBatchVector> path_to_batches;
-
- // TODO(bkietz) these calls to Partition() should be scattered across a
TaskGroup
- for (auto maybe_batch : IteratorFromReader(batches)) {
- ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
- ARROW_ASSIGN_OR_RAISE(auto partitioned_batches,
partitioning->Partition(batch));
- for (auto&& partitioned_batch : partitioned_batches) {
- AndExpression expr(std::move(partitioned_batch.partition_expression),
- partition_expression);
- ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
- path = fs::internal::EnsureLeadingSlash(path);
- path_to_batches[path].push_back(std::move(partitioned_batch.batch));
+ Status OpenWriter(const FileSystemDatasetWriteOptions& write_options,
+ std::shared_ptr<Schema> schema) {
+ auto dir =
+ fs::internal::EnsureTrailingSlash(write_options.base_dir) +
partition_expression_;
+
+ auto basename = internal::Replace(write_options.basename_template,
kIntegerToken,
+ std::to_string(index_));
+ if (!basename) {
+ return Status::Invalid("string interpolation of basename template
failed");
}
- }
- for (auto&& path_batches : path_to_batches) {
- auto dir = base_dir + path_batches.first;
- RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
+ auto path = fs::internal::ConcatAbstractPath(dir, *basename);
- auto path = fs::internal::ConcatAbstractPath(dir, basename);
- ARROW_ASSIGN_OR_RAISE(auto destination,
filesystem->OpenOutputStream(path));
+ RETURN_NOT_OK(write_options.filesystem->CreateDir(dir));
+ ARROW_ASSIGN_OR_RAISE(auto destination,
+ write_options.filesystem->OpenOutputStream(path));
- DCHECK(!path_batches.second.empty());
- ARROW_ASSIGN_OR_RAISE(auto reader,
-
RecordBatchReader::Make(std::move(path_batches.second)));
- RETURN_NOT_OK(format->WriteFragment(reader.get(), destination.get()));
+ ARROW_ASSIGN_OR_RAISE(writer_, write_options.format()->MakeWriter(
+ std::move(destination),
std::move(schema),
+ write_options.file_write_options));
+ return Status::OK();
}
- return Status::OK();
-}
+ using Set = std::unordered_map<std::string, WriteQueue*>;
+
+ private:
+ util::Mutex writer_mutex_;
+ std::shared_ptr<FileWriter> writer_;
-Status FileSystemDataset::Write(std::shared_ptr<Schema> schema,
- std::shared_ptr<FileFormat> format,
- std::shared_ptr<fs::FileSystem> filesystem,
- std::string base_dir,
- std::shared_ptr<Partitioning> partitioning,
- std::shared_ptr<ScanContext> scan_context,
- FragmentIterator fragment_it) {
- auto task_group = scan_context->TaskGroup();
+ util::Mutex push_mutex_;
+ std::deque<std::shared_ptr<RecordBatch>> pending_;
- base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir));
+ // The (formatted) partition expression to which this queue corresponds
+ std::string partition_expression_;
- for (const auto& f : partitioning->schema()->fields()) {
+ size_t index_;
+};
+
+Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions&
write_options,
+ std::shared_ptr<Scanner> scanner) {
+ for (const auto& f : write_options.partitioning->schema()->fields()) {
if (f->type()->id() == Type::DICTIONARY) {
return Status::NotImplemented("writing with dictionary partitions");
}
}
- int i = 0;
- for (auto maybe_fragment : fragment_it) {
- ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment);
- auto task = std::make_shared<WriteTask>();
-
- task->basename = "dat_" + std::to_string(i++) + "." + format->type_name();
- task->partition_expression = fragment->partition_expression();
- task->format = format;
- task->filesystem = filesystem;
- task->base_dir = base_dir;
- task->partitioning = partitioning;
+ RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));
+
+ auto task_group = scanner->context()->TaskGroup();
+
+ // Things we'll un-lazy for the sake of simplicity, with the tradeoff they
represent:
+ //
+ // - Fragment iteration. Keeping this lazy would allow us to start
partitioning/writing
+ // any fragments we have before waiting for discovery to complete. This
isn't
+ // currently implemented for FileSystemDataset anyway: ARROW-8613
+ //
+ // - ScanTask iteration. Keeping this lazy would save some unnecessary
blocking when
+ // writing Fragments which produce scan tasks slowly. No Fragments do this.
+ //
+ // NB: neither of these will have any impact whatsoever on the common case
of writing
+ // an in-memory table to disk.
+ ARROW_ASSIGN_OR_RAISE(FragmentVector fragments,
scanner->GetFragments().ToVector());
+ ScanTaskVector scan_tasks;
+ std::vector<const Fragment*> fragment_for_task;
+
+ // Avoid contention with multithreaded readers
+ auto context = std::make_shared<ScanContext>(*scanner->context());
+ context->use_threads = false;
+
+ for (const auto& fragment : fragments) {
+ auto options = std::make_shared<ScanOptions>(*scanner->options());
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
+ Scanner(fragment, std::move(options),
context).Scan());
+ for (auto maybe_scan_task : scan_task_it) {
+ ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
+ scan_tasks.push_back(std::move(scan_task));
+ fragment_for_task.push_back(fragment.get());
+ }
+ }
- // make a record batch reader which yields from a fragment
- ARROW_ASSIGN_OR_RAISE(task->batches, FragmentRecordBatchReader::Make(
- std::move(fragment), schema,
scan_context));
- task_group->Append([task] { return task->Execute(); });
+ // WriteQueues are stored in this deque until writing is completed and are
otherwise
+ // referenced by non-owning pointers.
+ std::deque<WriteQueue> queues_storage;
+
+ // Store a mapping from partitions (represened by their formatted partition
expressions)
+ // to a WriteQueue which flushes batches into that partition's output file.
In principle
+ // any thread could produce a batch for any partition, so each task
alternates between
+ // pushing batches and flushing them to disk.
+ util::Mutex queues_mutex;
+ WriteQueue::Set queues;
+
+ auto fragment_for_task_it = fragment_for_task.begin();
+ for (const auto& scan_task : scan_tasks) {
+ const Fragment* fragment = *fragment_for_task_it++;
+
+ task_group->Append([&, scan_task, fragment] {
+ ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
+
+ for (auto maybe_batch : batches) {
+ ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+ ARROW_ASSIGN_OR_RAISE(auto groups,
write_options.partitioning->Partition(batch));
+ batch.reset(); // drop to hopefully conserve memory
+
+ std::unordered_set<WriteQueue*> need_flushed;
+ for (size_t i = 0; i < groups.batches.size(); ++i) {
+ AndExpression partition_expression(std::move(groups.expressions[i]),
+ fragment->partition_expression());
+ auto batch = std::move(groups.batches[i]);
+
+ ARROW_ASSIGN_OR_RAISE(auto part,
+
write_options.partitioning->Format(partition_expression));
+
+ util::Mutex::Guard wait_for_opened_writer_lock;
+
+ WriteQueue* queue;
+ {
+ // lookup the queue to which batch should be appended
+ auto queues_lock = queues_mutex.Lock();
+
+ queue = internal::GetOrInsertGenerated(
+ &queues, std::move(part),
+ [&](const std::string& emplaced_part) {
+ // lookup in `queues` also failed,
+ // generate a new WriteQueue
+ size_t queue_index = queues.size() - 1;
+
+ queues_storage.emplace_back(emplaced_part,
queue_index,
+
&wait_for_opened_writer_lock);
Review comment:
It could also be a `DeferredInit` facility:
```c++
// T needs to define a method `Status Init()`
template <typename T>
class DeferredInit<T> {
public:
template <typename... Args>
DeferredInit(Args&&... args)
: value_(std::forward<Args>(args)...) {}
Result<T*> operator()() {
std::call_once(initialized_, [this]() { value_.Init(); });
RETURN_NOT_OK(status_);
return &value_;
}
protected:
T value_;
Status status_;
std::once_flag initialized_;
};
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]