pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545368032



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted 
if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return 
Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return 
Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) 
{
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain 
alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing 
will finish
+  // it.  In theory the intermediate futures could be finished by some 
external process
+  // but that would be highly unusual and bad practice so in reality this 
would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, 
&count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& 
result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+
+    fut.MarkFinished();
+
+    while (count_finished_deferred.load() == 0) {

Review comment:
       Shouldn't you wait on `count_finished_immediately` instead? It seems 
`count_finished_deferred` (callback called from main thread) is for 
pre-`MarkFinished` callbacks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to