westonpace commented on a change in pull request #10955:
URL: https://github.com/apache/arrow/pull/10955#discussion_r717487535
##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -327,222 +322,70 @@ Status FileWriter::Finish() {
namespace {
-constexpr util::string_view kIntegerToken = "{i}";
+Future<> WriteNextBatch(DatasetWriter* dataset_writer, TaggedRecordBatch batch,
+ const FileSystemDatasetWriteOptions& write_options) {
+ ARROW_ASSIGN_OR_RAISE(auto groups,
+
write_options.partitioning->Partition(batch.record_batch));
+ batch.record_batch.reset(); // drop to hopefully conserve memory
-Status ValidateBasenameTemplate(util::string_view basename_template) {
- if (basename_template.find(fs::internal::kSep) != util::string_view::npos) {
- return Status::Invalid("basename_template contained '/'");
- }
- size_t token_start = basename_template.find(kIntegerToken);
- if (token_start == util::string_view::npos) {
- return Status::Invalid("basename_template did not contain '",
kIntegerToken, "'");
- }
- return Status::OK();
-}
-
-/// WriteQueue allows batches to be pushed from multiple threads while another
thread
-/// flushes some to disk.
-class WriteQueue {
- public:
- WriteQueue(std::string partition_expression, size_t index,
- std::shared_ptr<Schema> schema)
- : partition_expression_(std::move(partition_expression)),
- index_(index),
- schema_(std::move(schema)) {}
-
- // Push a batch into the writer's queue of pending writes.
- void Push(std::shared_ptr<RecordBatch> batch) {
- auto push_lock = push_mutex_.Lock();
- pending_.push_back(std::move(batch));
- }
-
- // Flush all pending batches, or return immediately if another thread is
already
- // flushing this queue.
- Status Flush(const FileSystemDatasetWriteOptions& write_options) {
- if (auto writer_lock = writer_mutex_.TryLock()) {
- if (writer_ == nullptr) {
- // FileWriters are opened lazily to avoid blocking access to a
scan-wide queue set
- RETURN_NOT_OK(OpenWriter(write_options));
- }
-
- while (true) {
- std::shared_ptr<RecordBatch> batch;
- {
- auto push_lock = push_mutex_.Lock();
- if (pending_.empty()) {
- // Ensure the writer_lock is released before the push_lock.
Otherwise another
- // thread might successfully Push() a batch but then fail to
Flush() it since
- // the writer_lock is still held, leaving an unflushed batch in
pending_.
- writer_lock.Unlock();
- break;
- }
- batch = std::move(pending_.front());
- pending_.pop_front();
- }
- RETURN_NOT_OK(writer_->Write(batch));
- }
- }
- return Status::OK();
- }
-
- const std::shared_ptr<FileWriter>& writer() const { return writer_; }
-
- private:
- Status OpenWriter(const FileSystemDatasetWriteOptions& write_options) {
- auto dir =
- fs::internal::EnsureTrailingSlash(write_options.base_dir) +
partition_expression_;
-
- auto basename = ::arrow::internal::Replace(write_options.basename_template,
- kIntegerToken,
std::to_string(index_));
- if (!basename) {
- return Status::Invalid("string interpolation of basename template
failed");
- }
-
- auto path = fs::internal::ConcatAbstractPath(dir, *basename);
-
- RETURN_NOT_OK(write_options.filesystem->CreateDir(dir));
- ARROW_ASSIGN_OR_RAISE(auto destination,
- write_options.filesystem->OpenOutputStream(path));
-
- ARROW_ASSIGN_OR_RAISE(
- writer_, write_options.format()->MakeWriter(std::move(destination),
schema_,
-
write_options.file_write_options,
- {write_options.filesystem,
path}));
- return Status::OK();
- }
-
- util::Mutex writer_mutex_;
- std::shared_ptr<FileWriter> writer_;
-
- util::Mutex push_mutex_;
- std::deque<std::shared_ptr<RecordBatch>> pending_;
-
- // The (formatted) partition expression to which this queue corresponds
- std::string partition_expression_;
-
- size_t index_;
-
- std::shared_ptr<Schema> schema_;
-};
-
-struct WriteState {
- explicit WriteState(FileSystemDatasetWriteOptions write_options)
- : write_options(std::move(write_options)) {}
-
- FileSystemDatasetWriteOptions write_options;
- util::Mutex mutex;
- std::unordered_map<std::string, std::unique_ptr<WriteQueue>> queues;
-};
-
-Status WriteNextBatch(WriteState* state, const std::shared_ptr<Fragment>&
fragment,
- std::shared_ptr<RecordBatch> batch) {
- ARROW_ASSIGN_OR_RAISE(auto groups,
state->write_options.partitioning->Partition(batch));
- batch.reset(); // drop to hopefully conserve memory
-
- if (groups.batches.size() >
static_cast<size_t>(state->write_options.max_partitions)) {
+ if (groups.batches.size() >
static_cast<size_t>(write_options.max_partitions)) {
return Status::Invalid("Fragment would be written into ",
groups.batches.size(),
" partitions. This exceeds the maximum of ",
- state->write_options.max_partitions);
+ write_options.max_partitions);
}
- std::unordered_set<WriteQueue*> need_flushed;
- for (size_t i = 0; i < groups.batches.size(); ++i) {
- auto partition_expression =
- and_(std::move(groups.expressions[i]),
fragment->partition_expression());
- auto batch = std::move(groups.batches[i]);
-
- ARROW_ASSIGN_OR_RAISE(
- auto part,
state->write_options.partitioning->Format(partition_expression));
-
- WriteQueue* queue;
- {
- // lookup the queue to which batch should be appended
- auto queues_lock = state->mutex.Lock();
-
- queue = ::arrow::internal::GetOrInsertGenerated(
- &state->queues, std::move(part),
- [&](const std::string& emplaced_part) {
- // lookup in `queues` also failed,
- // generate a new WriteQueue
- size_t queue_index = state->queues.size() - 1;
-
- return ::arrow::internal::make_unique<WriteQueue>(
- emplaced_part, queue_index, batch->schema());
- })
- ->second.get();
- }
-
- queue->Push(std::move(batch));
- need_flushed.insert(queue);
- }
-
- // flush all touched WriteQueues
- for (auto queue : need_flushed) {
- RETURN_NOT_OK(queue->Flush(state->write_options));
- }
- return Status::OK();
-}
+ std::shared_ptr<size_t> counter = std::make_shared<size_t>(0);
+ std::shared_ptr<Fragment> fragment = std::move(batch.fragment);
-Status WriteInternal(const ScanOptions& scan_options, WriteState* state,
- ScanTaskVector scan_tasks) {
- // Store a mapping from partitions (represened by their formatted partition
expressions)
- // to a WriteQueue which flushes batches into that partition's output file.
In principle
- // any thread could produce a batch for any partition, so each task
alternates between
- // pushing batches and flushing them to disk.
- auto task_group = scan_options.TaskGroup();
-
- for (const auto& scan_task : scan_tasks) {
- task_group->Append([&, scan_task] {
- std::function<Status(std::shared_ptr<RecordBatch>)> visitor =
- [&](std::shared_ptr<RecordBatch> batch) {
- return WriteNextBatch(state, scan_task->fragment(),
std::move(batch));
- };
- return ::arrow::internal::RunSynchronously<Future<>>(
- [&](Executor* executor) { return scan_task->SafeVisit(executor,
visitor); },
- /*use_threads=*/false);
+ AsyncGenerator<std::shared_ptr<RecordBatch>> partitioned_batch_gen =
+ [groups, counter, fragment, &write_options,
+ dataset_writer]() -> Future<std::shared_ptr<RecordBatch>> {
+ auto index = *counter;
+ if (index >= groups.batches.size()) {
+ return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
+ }
+ auto partition_expression =
+ and_(groups.expressions[index], fragment->partition_expression());
+ auto next_batch = groups.batches[index];
+ ARROW_ASSIGN_OR_RAISE(std::string destination,
+
write_options.partitioning->Format(partition_expression));
+ (*counter)++;
+ return dataset_writer->WriteRecordBatch(next_batch,
destination).Then([next_batch] {
+ return next_batch;
});
- }
- return task_group->Finish();
+ };
+
+ return VisitAsyncGenerator(
+ std::move(partitioned_batch_gen),
+ [](const std::shared_ptr<RecordBatch>&) -> Status { return Status::OK();
});
}
} // namespace
Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions&
write_options,
std::shared_ptr<Scanner> scanner) {
- RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));
-
- // Things we'll un-lazy for the sake of simplicity, with the tradeoff they
represent:
- //
- // - Fragment iteration. Keeping this lazy would allow us to start
partitioning/writing
- // any fragments we have before waiting for discovery to complete. This
isn't
- // currently implemented for FileSystemDataset anyway: ARROW-8613
- //
- // - ScanTask iteration. Keeping this lazy would save some unnecessary
blocking when
- // writing Fragments which produce scan tasks slowly. No Fragments do this.
- //
- // NB: neither of these will have any impact whatsoever on the common case
of writing
- // an in-memory table to disk.
-
- ARROW_SUPPRESS_DEPRECATION_WARNING
-
- // TODO(ARROW-11782/ARROW-12288) Remove calls to Scan()
- ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan());
- ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector());
-
- ARROW_UNSUPPRESS_DEPRECATION_WARNING
-
- WriteState state(write_options);
- RETURN_NOT_OK(WriteInternal(*scanner->options(), &state,
std::move(scan_tasks)));
-
- auto task_group = scanner->options()->TaskGroup();
- for (const auto& part_queue : state.queues) {
- task_group->Append([&] {
-
RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get()));
- RETURN_NOT_OK(part_queue.second->writer()->Finish());
- return
write_options.writer_post_finish(part_queue.second->writer().get());
- });
- }
- return task_group->Finish();
+ ARROW_ASSIGN_OR_RAISE(auto batch_gen, scanner->ScanBatchesAsync());
+ ARROW_ASSIGN_OR_RAISE(auto dataset_writer,
DatasetWriter::Make(write_options));
+
+ AsyncGenerator<std::shared_ptr<int>> queued_batch_gen =
+ [batch_gen, &dataset_writer, &write_options]() ->
Future<std::shared_ptr<int>> {
+ Future<TaggedRecordBatch> next_batch_fut = batch_gen();
+ return next_batch_fut.Then(
+ [&dataset_writer, &write_options](const TaggedRecordBatch& batch) {
+ if (IsIterationEnd(batch)) {
+ return AsyncGeneratorEnd<std::shared_ptr<int>>();
+ }
+ return WriteNextBatch(dataset_writer.get(), batch,
write_options).Then([] {
+ return std::make_shared<int>(0);
Review comment:
Ugh, yeah, I forgot about this bit of ugliness. You are correct, I just
needed something "iterable". I've replaced this whole block with a task group
in #11017
https://github.com/apache/arrow/pull/11017/files#diff-2caf4e9bd3f139e05e55dca80725d8a9c436f5ccf65c76a37cebfa6ee9b36a6aR359-R367
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]