This is an automated email from the ASF dual-hosted git repository. yibocai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new d9092ec ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally d9092ec is described below commit d9092ec7e11c2a626f9086fedead475846b52356 Author: Weston Pace <weston.p...@gmail.com> AuthorDate: Tue Jul 6 09:09:16 2021 +0800 ARROW-13173: [C++] TestAsyncUtil.ReadaheadFailed asserts occasionally As @cyb70289 pointed out the test was dependent on timing and when running on a slow CI machine it could lead to failure. I changed the test to use condition variables instead of sleeps so that it should be fully deterministic now. Closes #10602 from westonpace/bugfix/ARROW-13173--c-testasyncutil-readaheadfailed-asserts-occasi Authored-by: Weston Pace <weston.p...@gmail.com> Signed-off-by: Yibo Cai <yibo....@arm.com> --- cpp/src/arrow/util/async_generator.h | 52 ++++++++++++++++-------------- cpp/src/arrow/util/async_generator_test.cc | 43 +++++++++++------------- cpp/src/arrow/util/iterator.h | 1 - 3 files changed, 47 insertions(+), 49 deletions(-) diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index c99bd86..5a6321f 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -697,30 +697,38 @@ class ReadaheadGenerator { ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead) : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {} - Future<T> operator()() { - // Copy so we can capture into lambdas + Future<T> AddMarkFinishedContinuation(Future<T> fut) { auto state = state_; - if (state->readahead_queue.empty()) { + return fut.Then( + [state](const T& result) -> Result<T> { + state->MarkFinishedIfDone(result); + return result; + }, + [state](const Status& err) -> Result<T> { + state->finished.store(true); + return err; + }); + } + + Future<T> operator()() { + if (state_->readahead_queue.empty()) { // This is the first request, let's pump the underlying queue - for (int i = 0; i < state->max_readahead; i++) { - auto next = state->source_generator(); - auto state = state_; - next.AddCallback( - [state](const Result<T>& result) { state->MarkFinishedIfDone(result); }); - state->readahead_queue.push(std::move(next)); + for (int i = 0; i < state_->max_readahead; i++) { + auto next = state_->source_generator(); + auto next_after_check = AddMarkFinishedContinuation(std::move(next)); + state_->readahead_queue.push(std::move(next_after_check)); } } // Pop one and add one - auto result = state->readahead_queue.front(); - state->readahead_queue.pop(); - if (state->finished.load()) { - state->readahead_queue.push(AsyncGeneratorEnd<T>()); + auto result = state_->readahead_queue.front(); + state_->readahead_queue.pop(); + if (state_->finished.load()) { + state_->readahead_queue.push(AsyncGeneratorEnd<T>()); } else { - auto back_of_queue = state->source_generator(); - auto state = state_; - back_of_queue.AddCallback( - [state](const Result<T>& result) { state->MarkFinishedIfDone(result); }); - state->readahead_queue.push(std::move(back_of_queue)); + auto back_of_queue = state_->source_generator(); + auto back_of_queue_after_check = + AddMarkFinishedContinuation(std::move(back_of_queue)); + state_->readahead_queue.push(std::move(back_of_queue_after_check)); } return result; } @@ -732,13 +740,9 @@ class ReadaheadGenerator { finished.store(false); } - void MarkFinishedIfDone(const Result<T>& next_result) { - if (!next_result.ok()) { + void MarkFinishedIfDone(const T& next_result) { + if (IsIterationEnd(next_result)) { finished.store(true); - } else { - if (IsIterationEnd(*next_result)) { - finished.store(true); - } } } diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 87c1737..361ce3e 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -1094,42 +1094,37 @@ TEST(TestAsyncUtil, ReadaheadMove) { } TEST(TestAsyncUtil, ReadaheadFailed) { - ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(4)); + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(20)); std::atomic<int32_t> counter(0); + auto gating_task = GatingTask::Make(); // All tasks are a little slow. The first task fails. // The readahead will have spawned 9 more tasks and they // should all pass - auto source = [thread_pool, &counter]() -> Future<TestInt> { + auto source = [&]() -> Future<TestInt> { auto count = counter++; - return *thread_pool->Submit([count]() -> Result<TestInt> { + return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> { + gating_task->Task()(); if (count == 0) { return Status::Invalid("X"); } return TestInt(count); - }); + })); }; auto readahead = MakeReadaheadGenerator<TestInt>(source, 10); - ASSERT_FINISHES_AND_RAISES(Invalid, readahead()); - SleepABit(); - - for (int i = 0; i < 9; i++) { - ASSERT_FINISHES_OK_AND_ASSIGN(auto next_val, readahead()); - ASSERT_EQ(TestInt(i + 1), next_val); + auto should_be_invalid = readahead(); + // Polling once should allow 10 additional calls to start + ASSERT_OK(gating_task->WaitForRunning(11)); + ASSERT_OK(gating_task->Unlock()); + + // Once unlocked the error task should always be the first. Some number of successful + // tasks may follow until the end. + ASSERT_FINISHES_AND_RAISES(Invalid, should_be_invalid); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto remaining_results, CollectAsyncGenerator(readahead)); + // Don't need to know the exact number of successful tasks (and it may vary) + for (std::size_t i = 0; i < remaining_results.size(); i++) { + ASSERT_EQ(TestInt(static_cast<int>(i) + 1), remaining_results[i]); } - ASSERT_FINISHES_OK_AND_ASSIGN(auto after, readahead()); - - // It's possible that finished was set quickly and there - // are only 10 elements - if (IsIterationEnd(after)) { - return; - } - - // It's also possible that finished was too slow and there - // ended up being 11 elements - ASSERT_EQ(TestInt(10), after); - // There can't be 12 elements because SleepABit will prevent it - ASSERT_FINISHES_OK_AND_ASSIGN(auto definitely_last, readahead()); - ASSERT_TRUE(IsIterationEnd(definitely_last)); } class EnumeratorTestFixture : public GeneratorTestFixture { diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index b82021e..2f42803 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -20,7 +20,6 @@ #include <cassert> #include <functional> #include <memory> -#include <queue> #include <tuple> #include <type_traits> #include <utility>