zanmato1984 commented on code in PR #46711:
URL: https://github.com/apache/arrow/pull/46711#discussion_r2139447536


##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -316,15 +316,11 @@ class ThrottledAsyncTaskSchedulerImpl
 #endif
       queue_->Push(std::move(task));
       lk.unlock();
-      maybe_backoff->AddCallback(
-          [weak_self = std::weak_ptr<ThrottledAsyncTaskSchedulerImpl>(
-               shared_from_this())](const Status& st) {
-            if (st.ok()) {
-              if (auto self = weak_self.lock()) {
-                self->ContinueTasks();
-              }
-            }
-          });
+      maybe_backoff->AddCallback([weak_self = weak_from_this()](const Status& 
st) {

Review Comment:
   Just for the record, this is a style only change right?



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -350,8 +346,9 @@ class ThrottledAsyncTaskSchedulerImpl
          self = shared_from_this()]() mutable -> Result<Future<>> {
           ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)());
           if (!inner_fut.TryAddCallback([&] {
-                return [latched_cost, self = std::move(self)](const Status& 
st) -> void {
-                  if (st.ok()) {
+                return [latched_cost,
+                        weak_self = self->weak_from_this()](const Status& st) 
-> void {

Review Comment:
   Why change to capturing a weak ptr? Will capturing shared ptr cause any 
problem?



##########
cpp/src/arrow/dataset/dataset_writer.cc:
##########
@@ -520,11 +543,26 @@ class DatasetWriter::DatasetWriterImpl {
               return Status::OK();
             })),
         write_options_(std::move(write_options)),
-        writer_state_(max_rows_queued, write_options_.max_open_files,
-                      CalculateMaxRowsStaged(max_rows_queued)),
+        writer_state_(std::make_shared<DatasetWriterState>(
+            max_rows_queued, write_options_.max_open_files,
+            CalculateMaxRowsStaged(max_rows_queued))),
         pause_callback_(std::move(pause_callback)),
         resume_callback_(std::move(resume_callback)) {}
 
+  ~DatasetWriterImpl() {
+    // In case something went wrong (e.g. an IO error occurred), some tasks
+    // may be left dangling in a ThrottledAsyncTaskScheduler and that may

Review Comment:
   How are such tasks left dangling exactly?



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -377,8 +375,8 @@ class ThrottledAsyncTaskSchedulerImpl
       if (maybe_backoff) {
         lk.unlock();
         if (!maybe_backoff->TryAddCallback([&] {
-              return [self = shared_from_this()](const Status& st) {
-                if (st.ok()) {
+              return [weak_self = weak_from_this()](const Status& st) {

Review Comment:
   Ditto.



##########
cpp/src/arrow/dataset/dataset_writer.cc:
##########
@@ -217,21 +227,26 @@ class DatasetWriterFileQueue {
   Status Finish() {
     writer_state_->staged_rows_count -= rows_currently_staged_;
     while (!staged_batches_.empty()) {
-      RETURN_NOT_OK(PopAndDeliverStagedBatch());
+      auto st = PopAndDeliverStagedBatch().status();
+      if (!st.ok()) {
+        file_tasks_.reset();
+        return st;
+      }

Review Comment:
   ```suggestion
         RETURN_NOT_OK_ELSE(PopAndDeliverStagedBatch(), file_tasks_.reset());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to