westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545721129
##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
}
}
+TEST(FutureRefTest, ChainRemoved) {
+ // Creating a future chain should not prevent the futures from being deleted
if the
+ // entire chain is deleted
+ std::weak_ptr<FutureImpl> ref;
+ std::weak_ptr<FutureImpl> ref2;
+ {
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return
Status::OK(); });
+ ref = fut.impl_;
+ ref2 = fut2.impl_;
+ }
+ ASSERT_TRUE(ref.expired());
+ ASSERT_TRUE(ref2.expired());
+
+ {
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status&) { return
Future<>::Make(); });
+ ref = fut.impl_;
+ ref2 = fut2.impl_;
+ }
+ ASSERT_TRUE(ref.expired());
+ ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+ // Keeping the head of the future chain should keep the entire chain alive
+ std::shared_ptr<Future<>> ref;
+ std::weak_ptr<FutureImpl> ref2;
+ bool side_effect_run = false;
+ {
+ ref = std::make_shared<Future<>>(Future<>::Make());
+ auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status)
{
+ side_effect_run = true;
+ return Status::OK();
+ });
+ ref2 = fut2.impl_;
+ }
+ ASSERT_FALSE(ref2.expired());
+
+ ref->MarkFinished();
+ ASSERT_TRUE(side_effect_run);
+ ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+ // Keeping the tail of the future chain should not keep the entire chain
alive. If no
+ // one has a reference to the head then there is no need to keep it, nothing
will finish
+ // it. In theory the intermediate futures could be finished by some
external process
+ // but that would be highly unusual and bad practice so in reality this
would just be a
+ // reference to a future that will never complete which is ok.
+ std::weak_ptr<FutureImpl> ref;
+ std::shared_ptr<Future<>> ref2;
+ {
+ auto fut = std::make_shared<Future<>>(Future<>::Make());
+ ref = fut->impl_;
+ ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+ }
+ ASSERT_TRUE(ref.expired());
+
+ {
+ auto fut = Future<>::Make();
+ ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+ auto intermediate = Future<>::Make();
+ ref = intermediate.impl_;
+ return intermediate;
+ }));
+ fut.MarkFinished();
+ }
+ ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+ {
+ // Simple callback
+ auto fut = Future<int>::Make();
+ int passed_in_result = 0;
+ auto fut2 =
+ fut.Then([&passed_in_result](const int& result) { passed_in_result =
result; });
+ fut.MarkFinished(42);
+ AssertSuccessful(fut2);
+ ASSERT_EQ(passed_in_result, 42);
+ }
+ {
+ // Propagate failure
+ auto fut = Future<int>::Make();
+ auto fut2 = fut.Then([](const int& result) {});
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertFailed(fut2);
+ ASSERT_TRUE(fut2.status().IsIOError());
+ }
+ {
+ // Swallow failure
+ auto fut = Future<int>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertSuccessful(fut2);
+ }
+ {
+ // From void
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+ fut.MarkFinished();
+ AssertSuccessful(fut2);
+ }
+ {
+ // From failed status
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+ fut.MarkFinished(Status::IOError("xxx"));
+ AssertFailed(fut2);
+ }
+ {
+ // Recover a failed status
+ auto fut = Future<>::Make();
+ Status status_seen = Status::OK();
+ auto fut2 = fut.Then(StatusOnly,
+ [&status_seen](const Status& result) { status_seen =
result; });
+ ASSERT_TRUE(status_seen.ok());
+ fut.MarkFinished(Status::IOError("xxx"));
+ ASSERT_TRUE(status_seen.IsIOError());
+ AssertSuccessful(fut2);
+ }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+ {
+ // Simple callback
+ auto fut = Future<int>::Make();
+ auto fut2 = fut.Then([](const Result<int>& result) {
+ auto passed_in_result = *result;
+ return passed_in_result * passed_in_result;
+ });
+ fut.MarkFinished(42);
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, 42 * 42);
+ }
+ {
+ // Propagate failure
+ auto fut = Future<int>::Make();
+ auto fut2 = fut.Then([](const Result<int>& result) {
+ auto passed_in_result = *result;
+ return passed_in_result * passed_in_result;
+ });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertFailed(fut2);
+ ASSERT_TRUE(fut2.status().IsIOError());
+ }
+ {
+ // Swallow failure
+ auto fut = Future<int>::Make();
+ bool was_io_error = false;
+ auto fut2 = fut.Then([](int) { return 99; },
+ [&was_io_error](const Status& s) {
+ was_io_error = s.IsIOError();
+ return 100;
+ });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, 100);
+ ASSERT_TRUE(was_io_error);
+ }
+ {
+ // From void
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then([](...) { return 42; });
+ fut.MarkFinished();
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, 42);
+ }
+ {
+ // From failed status
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then([](...) { return 42; });
+ fut.MarkFinished(Status::IOError("xxx"));
+ AssertFailed(fut2);
+ }
+ {
+ // Recover a failed status
+ auto fut = Future<>::Make();
+ Status status_seen = Status::OK();
+ auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+ status_seen = s;
+ return 42;
+ });
+ ASSERT_TRUE(status_seen.ok());
+ fut.MarkFinished(Status::IOError("xxx"));
+ ASSERT_TRUE(status_seen.IsIOError());
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, 42);
+ }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+ {
+ // Simple callback
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<std::string>::Make();
+ int passed_in_result = 0;
+ auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>&
result) {
+ passed_in_result = *result;
+ return innerFut;
+ });
+ fut.MarkFinished(42);
+ ASSERT_EQ(passed_in_result, 42);
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished("hello");
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, "hello");
+ }
+ {
+ // Propagate failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<std::string>::Make();
+ auto fut2 = fut.Then([innerFut](const Result<int>& result) { return
innerFut; });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertFailed(fut2);
+ ASSERT_TRUE(fut2.status().IsIOError());
+ }
+ {
+ // Swallow failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<std::string>::Make();
+ bool was_io_error = false;
+ auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+ [&was_io_error, innerFut](const Status& s) {
+ was_io_error = s.IsIOError();
+ return innerFut;
+ });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished("hello");
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, "hello");
+ ASSERT_TRUE(was_io_error);
+ }
+ {
+ // From void
+ auto fut = Future<>::Make();
+ auto innerFut = Future<std::string>::Make();
+ auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+ fut.MarkFinished();
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished("hello");
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, "hello");
+ }
+ {
+ // From failed status
+ auto fut = Future<>::Make();
+ auto innerFut = Future<std::string>::Make();
+ auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+ fut.MarkFinished(Status::IOError("xxx"));
+ AssertFailed(fut2);
+ }
+ {
+ // Recover a failed status
+ auto fut = Future<>::Make();
+ auto innerFut = Future<std::string>::Make();
+ Status status_seen = Status::OK();
+ auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status&
s) {
+ status_seen = s;
+ return innerFut;
+ });
+ ASSERT_TRUE(status_seen.ok());
+ fut.MarkFinished(Status::IOError("xxx"));
+ ASSERT_TRUE(status_seen.IsIOError());
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished("hello");
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, "hello");
+ }
+}
+
+TEST(FutureCompletionTest, Status) {
+ {
+ // Simple callback
+ auto fut = Future<int>::Make();
+ int passed_in_result = 0;
+ Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+ passed_in_result = *result;
+ return Status::OK();
+ });
+ fut.MarkFinished(42);
+ ASSERT_EQ(passed_in_result, 42);
+ AssertSuccessful(fut2);
+ }
+ {
+ // Propagate failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<std::string>::Make();
+ auto fut2 = fut.Then([innerFut](const Result<int>& result) { return
innerFut; });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertFailed(fut2);
+ ASSERT_TRUE(fut2.status().IsIOError());
+ }
+ {
+ // Swallow failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<std::string>::Make();
+ bool was_io_error = false;
+ auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+ [&was_io_error, innerFut](const Status& s) {
+ was_io_error = s.IsIOError();
+ return innerFut;
+ });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished("hello");
+ AssertSuccessful(fut2);
+ auto result = *fut2.result();
+ ASSERT_EQ(result, "hello");
+ ASSERT_TRUE(was_io_error);
+ }
+ {
+ // From void
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+ fut.MarkFinished();
+ AssertSuccessful(fut2);
+ }
+ {
+ // From failed status
+ auto fut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+ fut.MarkFinished(Status::IOError("xxx"));
+ AssertFailed(fut2);
+ }
+ {
+ // Recover a failed status
+ auto fut = Future<>::Make();
+ Status status_seen = Status::OK();
+ auto fut2 =
+ fut.Then(StatusOnly, [&status_seen](const Status& s) { status_seen =
s; });
+ ASSERT_TRUE(status_seen.ok());
+ fut.MarkFinished(Status::IOError("xxx"));
+ ASSERT_TRUE(status_seen.IsIOError());
+ AssertSuccessful(fut2);
+ }
+}
+
+TEST(FutureCompletionTest, FutureStatus) {
+ {
+ // Simple callback
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<>::Make();
+ int passed_in_result = 0;
+ Future<> fut2 = fut.Then([&passed_in_result, innerFut](const int& i) {
+ passed_in_result = i;
+ return innerFut;
+ });
+ fut.MarkFinished(42);
+ ASSERT_EQ(passed_in_result, 42);
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished(Status::OK());
+ AssertSuccessful(fut2);
+ }
+ {
+ // Propagate failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<>::Make();
+ auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertFailed(fut2);
+ ASSERT_TRUE(fut2.status().IsIOError());
+ }
+ {
+ // Swallow failure
+ auto fut = Future<int>::Make();
+ auto innerFut = Future<>::Make();
+ bool was_io_error = false;
+ auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; },
+ [&was_io_error, innerFut](const Status& s) {
+ was_io_error = s.IsIOError();
+ return innerFut;
+ });
+ fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished(Status::OK());
+ AssertSuccessful(fut2);
+ }
+ {
+ // From void
+ auto fut = Future<>::Make();
+ auto innerFut = Future<>::Make();
+ auto fut2 = fut.Then(StatusOnly, [&innerFut](const Status&) { return
innerFut; });
+ fut.MarkFinished();
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished(Status::OK());
+ AssertSuccessful(fut2);
+ }
+ {
+ // From failed status
+ auto fut = Future<>::Make();
+ auto innerFut = Future<>::Make();
+ auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+ fut.MarkFinished(Status::IOError("xxx"));
+ AssertFailed(fut2);
+ }
+ {
+ // Recover a failed status
+ auto fut = Future<>::Make();
+ auto innerFut = Future<>::Make();
+ Status status_seen = Status::OK();
+ auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status&
s) {
+ status_seen = s;
+ return innerFut;
+ });
+ ASSERT_TRUE(status_seen.ok());
+ fut.MarkFinished(Status::IOError("xxx"));
+ ASSERT_TRUE(status_seen.IsIOError());
+ AssertNotFinished(fut2);
+ innerFut.MarkFinished(Status::OK());
+ AssertSuccessful(fut2);
+ }
+}
+
+TEST(FutureCompletionTest, Result) {
+ {
Review comment:
This is tested now in at least one place `FutureCompletionTest, Result`
in the section `// Propagate failure by returning failure`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]