westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565964716
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
ASSERT_EQ(ints_it, ints.end());
}
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+ int remaining = n;
+ return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+ if (remaining > 0) {
+ remaining--;
+ return TransformYield(next);
+ }
+ return TransformFinish();
+ };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+ auto original = VectorIt({1, 2, 3});
+ auto truncated = MakeTransformedIterator(std::move(original),
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+ auto original = VectorIt<std::shared_ptr<int>>(
+ {std::make_shared<int>(1), std::make_shared<int>(2),
std::make_shared<int>(3)});
+ auto truncated =
+ MakeTransformedIterator(std::move(original),
MakeFirstN<std::shared_ptr<int>>(2));
+ ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+ ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+ // Tests the failsafe case where we never call Finish
+ auto original = VectorIt({1});
+ auto truncated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto background = BackgroundAsyncVectorIt(expected);
+ auto future = CollectAsyncGenerator(background);
+ ASSERT_FALSE(future.is_finished());
+ future.Wait();
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+ Result<TestInt> Next() {
+ if (called_) {
+ return Status::Invalid("Should not have been called twice");
+ }
+ SleepFor(0.1);
+ return IterationTraits<TestInt>::End();
+ }
+
+ private:
+ bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+ // Ensure that the background iterator properly fulfills the asyncgenerator
contract
+ // and can be called after it ends.
+ auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+ ASSERT_OK_AND_ASSIGN(
+ auto background_iter,
+ MakeBackgroundIterator(std::move(iterator),
internal::GetCpuThreadPool()));
+
+ auto one = background_iter();
+ auto two = background_iter();
+
+ ASSERT_TRUE(one.Wait(0.5));
+
+ if (one.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+ }
+
+ ASSERT_TRUE(two.Wait(0.5));
+ ASSERT_TRUE(two.is_finished());
+ if (two.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+ }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+ AsyncGenerator<TestInt> generator = []() {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ };
+ Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, skip_all);
+ auto future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+ ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+ auto expected = RangeVector(100);
+ std::vector<Future<std::vector<TestInt>>> futures;
+ for (unsigned int i = 0; i < 100; i++) {
+ auto background = BackgroundAsyncVectorIt(expected);
+ futures.push_back(CollectAsyncGenerator(background));
+ }
+ auto combined = All(futures);
+ combined.Wait(2);
+ if (combined.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+ for (auto&& vector : completed_vectors) {
+ ASSERT_EQ(vector, expected);
+ }
+ } else {
+ FAIL() << "After 2 seconds all background iterators had not finished
collecting";
+ }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+ int counter = 0;
+ AsyncGenerator<TestInt> generator = [&counter]() {
+ if (counter < 1000000) {
+ return Future<TestInt>::MakeFinished(counter++);
+ } else {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ }
+ };
+ Transformer<TestInt, TestInt> discard =
+ [](TestInt next) -> Result<TransformFlow<TestInt>> { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, discard);
+ auto collected_future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(collected_future.Wait(5));
+ if (collected_future.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+ ASSERT_EQ(0, collected.size());
+ }
+}
+
+TEST(TestAsyncUtil, Visit) {
+ auto generator = AsyncVectorIt({1, 2, 3});
+ unsigned int sum = 0;
+ auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt
item) {
+ sum += item.value;
+ return Status::OK();
+ });
+ // Should be superfluous
+ sum_future.Wait();
+ ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto generator = AsyncVectorIt(expected);
+ auto collected = CollectAsyncGenerator(generator);
+ ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+ int current_repeat = 0;
+ return [repeat_count, current_repeat](T next) mutable ->
Result<TransformFlow<T>> {
+ current_repeat++;
+ bool ready_for_next = false;
+ if (current_repeat == repeat_count) {
+ current_repeat = 0;
+ ready_for_next = true;
+ }
+ return TransformYield(next, ready_for_next);
+ };
+}
+
+TEST(TestIteratorTransform, Repeating) {
Review comment:
I reordered the tests to clean this up and will take more care in the
future.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
ASSERT_EQ(ints_it, ints.end());
}
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+ int remaining = n;
+ return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+ if (remaining > 0) {
+ remaining--;
+ return TransformYield(next);
+ }
+ return TransformFinish();
+ };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+ auto original = VectorIt({1, 2, 3});
+ auto truncated = MakeTransformedIterator(std::move(original),
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+ auto original = VectorIt<std::shared_ptr<int>>(
+ {std::make_shared<int>(1), std::make_shared<int>(2),
std::make_shared<int>(3)});
+ auto truncated =
+ MakeTransformedIterator(std::move(original),
MakeFirstN<std::shared_ptr<int>>(2));
+ ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+ ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+ // Tests the failsafe case where we never call Finish
+ auto original = VectorIt({1});
+ auto truncated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto background = BackgroundAsyncVectorIt(expected);
+ auto future = CollectAsyncGenerator(background);
+ ASSERT_FALSE(future.is_finished());
+ future.Wait();
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+ Result<TestInt> Next() {
+ if (called_) {
+ return Status::Invalid("Should not have been called twice");
+ }
+ SleepFor(0.1);
+ return IterationTraits<TestInt>::End();
+ }
+
+ private:
+ bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+ // Ensure that the background iterator properly fulfills the asyncgenerator
contract
+ // and can be called after it ends.
+ auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+ ASSERT_OK_AND_ASSIGN(
+ auto background_iter,
+ MakeBackgroundIterator(std::move(iterator),
internal::GetCpuThreadPool()));
+
+ auto one = background_iter();
+ auto two = background_iter();
+
+ ASSERT_TRUE(one.Wait(0.5));
+
+ if (one.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+ }
+
+ ASSERT_TRUE(two.Wait(0.5));
+ ASSERT_TRUE(two.is_finished());
+ if (two.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+ }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+ AsyncGenerator<TestInt> generator = []() {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ };
+ Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, skip_all);
+ auto future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+ ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+ auto expected = RangeVector(100);
+ std::vector<Future<std::vector<TestInt>>> futures;
+ for (unsigned int i = 0; i < 100; i++) {
+ auto background = BackgroundAsyncVectorIt(expected);
+ futures.push_back(CollectAsyncGenerator(background));
+ }
+ auto combined = All(futures);
+ combined.Wait(2);
+ if (combined.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+ for (auto&& vector : completed_vectors) {
+ ASSERT_EQ(vector, expected);
+ }
+ } else {
+ FAIL() << "After 2 seconds all background iterators had not finished
collecting";
+ }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+ int counter = 0;
+ AsyncGenerator<TestInt> generator = [&counter]() {
+ if (counter < 1000000) {
+ return Future<TestInt>::MakeFinished(counter++);
+ } else {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ }
+ };
+ Transformer<TestInt, TestInt> discard =
+ [](TestInt next) -> Result<TransformFlow<TestInt>> { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, discard);
+ auto collected_future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(collected_future.Wait(5));
+ if (collected_future.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+ ASSERT_EQ(0, collected.size());
+ }
+}
+
+TEST(TestAsyncUtil, Visit) {
+ auto generator = AsyncVectorIt({1, 2, 3});
+ unsigned int sum = 0;
+ auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt
item) {
+ sum += item.value;
+ return Status::OK();
+ });
+ // Should be superfluous
+ sum_future.Wait();
+ ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto generator = AsyncVectorIt(expected);
+ auto collected = CollectAsyncGenerator(generator);
+ ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+ int current_repeat = 0;
+ return [repeat_count, current_repeat](T next) mutable ->
Result<TransformFlow<T>> {
+ current_repeat++;
+ bool ready_for_next = false;
+ if (current_repeat == repeat_count) {
+ current_repeat = 0;
+ ready_for_next = true;
+ }
+ return TransformYield(next, ready_for_next);
+ };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+ auto original = VectorIt({1, 2, 3});
+ auto repeated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeRepeatN<TestInt>(2));
+ AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+ return [filter](T next) -> Result<TransformFlow<T>> {
+ if (filter(next)) {
+ return TransformYield(next);
+ } else {
+ return TransformSkip();
+ }
+ };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+ int counter = 0;
+ return [counter](T next) mutable -> Result<TransformFlow<T>> {
+ if (counter++ == 1) {
+ return Status::Invalid("X");
+ }
+ return TransformYield(next);
+ };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+ // Exercises TransformSkip
+ auto original = VectorIt({1, 2, 3});
+ auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+ auto filtered = MakeTransformedIterator(std::move(original), filter);
+ AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+ // Exercises TransformSkip
+ auto original = VectorIt({1, 2, 3});
+ auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+ auto filtered = MakeTransformedIterator(std::move(original), filter);
+ AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+ auto original = VectorIt({1, 2, 3});
+ auto transformed =
+ MakeTransformedIterator(std::move(original),
MakeAbortOnSecond<TestInt>());
+ ASSERT_OK(transformed.Next());
+ ASSERT_RAISES(Invalid, transformed.Next());
+}
Review comment:
I now test this case.
##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
ASSERT_EQ(ints_it, ints.end());
}
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+ int remaining = n;
+ return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+ if (remaining > 0) {
+ remaining--;
+ return TransformYield(next);
+ }
+ return TransformFinish();
+ };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+ auto original = VectorIt({1, 2, 3});
+ auto truncated = MakeTransformedIterator(std::move(original),
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+ auto original = VectorIt<std::shared_ptr<int>>(
+ {std::make_shared<int>(1), std::make_shared<int>(2),
std::make_shared<int>(3)});
+ auto truncated =
+ MakeTransformedIterator(std::move(original),
MakeFirstN<std::shared_ptr<int>>(2));
+ ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+ ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+ // Tests the failsafe case where we never call Finish
+ auto original = VectorIt({1});
+ auto truncated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeFirstN<TestInt>(2));
+ AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto background = BackgroundAsyncVectorIt(expected);
+ auto future = CollectAsyncGenerator(background);
+ ASSERT_FALSE(future.is_finished());
+ future.Wait();
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+ Result<TestInt> Next() {
+ if (called_) {
+ return Status::Invalid("Should not have been called twice");
+ }
+ SleepFor(0.1);
+ return IterationTraits<TestInt>::End();
+ }
+
+ private:
+ bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+ // Ensure that the background iterator properly fulfills the asyncgenerator
contract
+ // and can be called after it ends.
+ auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+ ASSERT_OK_AND_ASSIGN(
+ auto background_iter,
+ MakeBackgroundIterator(std::move(iterator),
internal::GetCpuThreadPool()));
+
+ auto one = background_iter();
+ auto two = background_iter();
+
+ ASSERT_TRUE(one.Wait(0.5));
+
+ if (one.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+ }
+
+ ASSERT_TRUE(two.Wait(0.5));
+ ASSERT_TRUE(two.is_finished());
+ if (two.is_finished()) {
+ ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+ }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+ AsyncGenerator<TestInt> generator = []() {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ };
+ Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, skip_all);
+ auto future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(future.is_finished());
+ ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+ ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+ auto expected = RangeVector(100);
+ std::vector<Future<std::vector<TestInt>>> futures;
+ for (unsigned int i = 0; i < 100; i++) {
+ auto background = BackgroundAsyncVectorIt(expected);
+ futures.push_back(CollectAsyncGenerator(background));
+ }
+ auto combined = All(futures);
+ combined.Wait(2);
+ if (combined.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+ for (auto&& vector : completed_vectors) {
+ ASSERT_EQ(vector, expected);
+ }
+ } else {
+ FAIL() << "After 2 seconds all background iterators had not finished
collecting";
+ }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+ int counter = 0;
+ AsyncGenerator<TestInt> generator = [&counter]() {
+ if (counter < 1000000) {
+ return Future<TestInt>::MakeFinished(counter++);
+ } else {
+ return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+ }
+ };
+ Transformer<TestInt, TestInt> discard =
+ [](TestInt next) -> Result<TransformFlow<TestInt>> { return
TransformSkip(); };
+ auto transformed = TransformAsyncGenerator(generator, discard);
+ auto collected_future = CollectAsyncGenerator(transformed);
+ ASSERT_TRUE(collected_future.Wait(5));
+ if (collected_future.is_finished()) {
+ ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+ ASSERT_EQ(0, collected.size());
+ }
+}
+
+TEST(TestAsyncUtil, Visit) {
+ auto generator = AsyncVectorIt({1, 2, 3});
+ unsigned int sum = 0;
+ auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt
item) {
+ sum += item.value;
+ return Status::OK();
+ });
+ // Should be superfluous
+ sum_future.Wait();
+ ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+ std::vector<TestInt> expected = {1, 2, 3};
+ auto generator = AsyncVectorIt(expected);
+ auto collected = CollectAsyncGenerator(generator);
+ ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+ int current_repeat = 0;
+ return [repeat_count, current_repeat](T next) mutable ->
Result<TransformFlow<T>> {
+ current_repeat++;
+ bool ready_for_next = false;
+ if (current_repeat == repeat_count) {
+ current_repeat = 0;
+ ready_for_next = true;
+ }
+ return TransformYield(next, ready_for_next);
+ };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+ auto original = VectorIt({1, 2, 3});
+ auto repeated = MakeTransformedIterator<TestInt,
TestInt>(std::move(original),
+
MakeRepeatN<TestInt>(2));
+ AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+ return [filter](T next) -> Result<TransformFlow<T>> {
+ if (filter(next)) {
+ return TransformYield(next);
+ } else {
+ return TransformSkip();
+ }
+ };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+ int counter = 0;
+ return [counter](T next) mutable -> Result<TransformFlow<T>> {
+ if (counter++ == 1) {
+ return Status::Invalid("X");
+ }
+ return TransformYield(next);
+ };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+ // Exercises TransformSkip
+ auto original = VectorIt({1, 2, 3});
+ auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+ auto filtered = MakeTransformedIterator(std::move(original), filter);
+ AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+ // Exercises TransformSkip
+ auto original = VectorIt({1, 2, 3});
+ auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+ auto filtered = MakeTransformedIterator(std::move(original), filter);
+ AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+ auto original = VectorIt({1, 2, 3});
+ auto transformed =
+ MakeTransformedIterator(std::move(original),
MakeAbortOnSecond<TestInt>());
+ ASSERT_OK(transformed.Next());
+ ASSERT_RAISES(Invalid, transformed.Next());
+}
+
+TEST(TestAsyncIteratorTransform, SkipSome) {
+ auto original = AsyncVectorIt({1, 2, 3});
+ auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+ auto filtered = TransformAsyncGenerator(std::move(original), filter);
+ AssertAsyncGeneratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestAsyncUtil, ReadaheadFailed) {
+ auto source = []() -> Future<TestInt> {
+ return Future<TestInt>::MakeFinished(Status::Invalid("X"));
+ };
+ auto readahead = AddReadahead<TestInt>(source, 10);
+ auto next = readahead();
+ ASSERT_EQ(Status::Invalid("X"), next.status());
Review comment:
I now test this case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]