westonpace commented on a change in pull request #11556:
URL: https://github.com/apache/arrow/pull/11556#discussion_r737863205
##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -83,128 +84,163 @@ class Throttle {
std::mutex mutex_;
};
+struct DatasetWriterState {
+ DatasetWriterState(uint64_t rows_in_flight, uint64_t max_open_files,
+ uint64_t max_rows_staged)
+ : rows_in_flight_throttle(rows_in_flight),
+ open_files_throttle(max_open_files),
+ staged_rows_count(0),
+ max_rows_staged(max_rows_staged) {}
+
+ bool StagingFull() const { return staged_rows_count.load() >=
max_rows_staged; }
+
+ // Throttle for how many rows the dataset writer will allow to be in process
memory
+ // When this is exceeded the dataset writer will pause / apply backpressure
+ Throttle rows_in_flight_throttle;
+ // Control for how many files the dataset writer will open. When this is
exceeded
+ // the dataset writer will pause and it will also close the largest open
file.
+ Throttle open_files_throttle;
+ // Control for how many rows the dataset writer will allow to be staged. A
row is
+ // staged if it is waiting for more rows to reach minimum_batch_size. If
this is
+ // exceeded then the largest staged batch is unstaged (no backpressure is
applied)
+ std::atomic<uint64_t> staged_rows_count;
+ // If too many rows get staged we will end up with poor performance and, if
more rows
+ // are staged than max_rows_queued we will end up with deadlock. To avoid
this, once
+ // we have too many staged rows we just ignore min_rows_per_group
+ const uint64_t max_rows_staged;
+ // Mutex to guard access to the file visitors in the writer options
+ std::mutex visitors_mutex;
+};
+
class DatasetWriterFileQueue : public util::AsyncDestroyable {
public:
explicit DatasetWriterFileQueue(const Future<std::shared_ptr<FileWriter>>&
writer_fut,
const FileSystemDatasetWriteOptions& options,
- std::mutex* visitors_mutex)
- : options_(options), visitors_mutex_(visitors_mutex) {
- running_task_ = Future<>::Make();
- writer_fut.AddCallback(
- [this](const Result<std::shared_ptr<FileWriter>>& maybe_writer) {
- if (maybe_writer.ok()) {
- writer_ = *maybe_writer;
- Flush();
- } else {
- Abort(maybe_writer.status());
- }
- });
+ DatasetWriterState* writer_state)
+ : options_(options), writer_state_(writer_state) {
+ // If this AddTask call fails (e.g. we're given an already failing future)
then we
+ // will get the error later when we try and write to it.
Review comment:
Yes. Or, if for whatever reason we have no more `AddTask` calls then it
will fail when we close it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]