zanmato1984 commented on code in PR #46711:
URL: https://github.com/apache/arrow/pull/46711#discussion_r2139447536
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -316,15 +316,11 @@ class ThrottledAsyncTaskSchedulerImpl
#endif
queue_->Push(std::move(task));
lk.unlock();
- maybe_backoff->AddCallback(
- [weak_self = std::weak_ptr<ThrottledAsyncTaskSchedulerImpl>(
- shared_from_this())](const Status& st) {
- if (st.ok()) {
- if (auto self = weak_self.lock()) {
- self->ContinueTasks();
- }
- }
- });
+ maybe_backoff->AddCallback([weak_self = weak_from_this()](const Status&
st) {
Review Comment:
Just for the record, this is a style only change right?
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -350,8 +346,9 @@ class ThrottledAsyncTaskSchedulerImpl
self = shared_from_this()]() mutable -> Result<Future<>> {
ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)());
if (!inner_fut.TryAddCallback([&] {
- return [latched_cost, self = std::move(self)](const Status&
st) -> void {
- if (st.ok()) {
+ return [latched_cost,
+ weak_self = self->weak_from_this()](const Status& st)
-> void {
Review Comment:
Why change to capturing a weak ptr? Will capturing shared ptr cause any
problem?
##########
cpp/src/arrow/dataset/dataset_writer.cc:
##########
@@ -520,11 +543,26 @@ class DatasetWriter::DatasetWriterImpl {
return Status::OK();
})),
write_options_(std::move(write_options)),
- writer_state_(max_rows_queued, write_options_.max_open_files,
- CalculateMaxRowsStaged(max_rows_queued)),
+ writer_state_(std::make_shared<DatasetWriterState>(
+ max_rows_queued, write_options_.max_open_files,
+ CalculateMaxRowsStaged(max_rows_queued))),
pause_callback_(std::move(pause_callback)),
resume_callback_(std::move(resume_callback)) {}
+ ~DatasetWriterImpl() {
+ // In case something went wrong (e.g. an IO error occurred), some tasks
+ // may be left dangling in a ThrottledAsyncTaskScheduler and that may
Review Comment:
How are such tasks left dangling exactly?
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -377,8 +375,8 @@ class ThrottledAsyncTaskSchedulerImpl
if (maybe_backoff) {
lk.unlock();
if (!maybe_backoff->TryAddCallback([&] {
- return [self = shared_from_this()](const Status& st) {
- if (st.ok()) {
+ return [weak_self = weak_from_this()](const Status& st) {
Review Comment:
Ditto.
##########
cpp/src/arrow/dataset/dataset_writer.cc:
##########
@@ -217,21 +227,26 @@ class DatasetWriterFileQueue {
Status Finish() {
writer_state_->staged_rows_count -= rows_currently_staged_;
while (!staged_batches_.empty()) {
- RETURN_NOT_OK(PopAndDeliverStagedBatch());
+ auto st = PopAndDeliverStagedBatch().status();
+ if (!st.ok()) {
+ file_tasks_.reset();
+ return st;
+ }
Review Comment:
```suggestion
RETURN_NOT_OK_ELSE(PopAndDeliverStagedBatch(), file_tasks_.reset());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]