pitrou commented on a change in pull request #9779:
URL: https://github.com/apache/arrow/pull/9779#discussion_r603274804
##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -793,6 +794,118 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
ASSERT_TRUE(IsIterationEnd(definitely_last));
}
+class SequencerTestFixture : public GeneratorTestFixture {
+ protected:
+ void RandomShuffle(std::vector<TestInt>& values) {
+ std::default_random_engine gen(seed_++);
+ std::shuffle(values.begin(), values.end(), gen);
+ }
+
+ int seed_ = 42;
+ std::function<bool(const TestInt&, const TestInt&)> cmp_ =
+ [](const TestInt& left, const TestInt& right) { return left.value >
right.value; };
+ // Let's increment by 2's to make it interesting
+ std::function<bool(const TestInt&, const TestInt&)> is_next_ =
+ [](const TestInt& left, const TestInt& right) {
+ return left.value + 2 == right.value;
+ };
+};
+
+TEST_P(SequencerTestFixture, SequenceBasic) {
+ // Basic sequencing
+ auto original = MakeSource({6, 4, 2});
+ auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_,
TestInt(0));
+ AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+
+ // From ordered input
+ original = MakeSource({2, 4, 6});
+ sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
+ AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+}
+
+TEST_P(SequencerTestFixture, SequenceLambda) {
+ auto cmp = [](const TestInt& left, const TestInt& right) {
+ return left.value > right.value;
+ };
+ auto is_next = [](const TestInt& left, const TestInt& right) {
+ return left.value + 2 == right.value;
+ };
+ // Basic sequencing
+ auto original = MakeSource({6, 4, 2});
+ auto sequenced = MakeSequencingGenerator(original, cmp, is_next, TestInt(0));
+ AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+}
+
+TEST_P(SequencerTestFixture, SequenceError) {
+ {
+ auto original = MakeSource({6, 4, 2});
+ original = FailsAt(original, 1);
+ auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_,
TestInt(0));
+ auto collected = CollectAsyncGenerator(sequenced);
+ ASSERT_FINISHES_AND_RAISES(Invalid, collected);
+ }
+ {
+ // Failure should clear old items out of the queue immediately
+ // shared_ptr versions of cmp_ and is_next_
+ auto cmp = cmp_;
+ std::function<bool(const std::shared_ptr<TestInt>&, const
std::shared_ptr<TestInt>&)>
+ ptr_cmp =
+ [cmp](const std::shared_ptr<TestInt>& left,
+ const std::shared_ptr<TestInt>& right) { return cmp(*left,
*right); };
+ auto is_next = is_next_;
+ std::function<bool(const std::shared_ptr<TestInt>&, const
std::shared_ptr<TestInt>&)>
+ ptr_is_next = [is_next](const std::shared_ptr<TestInt>& left,
+ const std::shared_ptr<TestInt>& right) {
+ return is_next(*left, *right);
+ };
+
+ PushGenerator<std::shared_ptr<TestInt>> source;
+ auto sequenced = MakeSequencingGenerator(
+ static_cast<AsyncGenerator<std::shared_ptr<TestInt>>>(source), ptr_cmp,
+ ptr_is_next, std::make_shared<TestInt>(0));
+
+ auto should_be_cleared = std::make_shared<TestInt>(4);
+ std::weak_ptr<TestInt> ref = should_be_cleared;
+ auto producer = source.producer();
+ auto next_fut = sequenced();
+ producer.Push(std::move(should_be_cleared));
+ producer.Push(Status::Invalid("XYZ"));
+ ASSERT_TRUE(ref.expired());
+
+ ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
+ }
+ {
+ // Failure should interrupt pumping
+ PushGenerator<TestInt> source;
+ auto sequenced =
MakeSequencingGenerator(static_cast<AsyncGenerator<TestInt>>(source),
+ cmp_, is_next_, TestInt(0));
+
+ auto producer = source.producer();
+ auto next_fut = sequenced();
+ producer.Push(TestInt(4));
+ producer.Push(Status::Invalid("XYZ"));
+ producer.Push(TestInt(2));
+ ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
+ // The sequencer should not have pulled the 2 out of the source because it
should
+ // have stopped pumping on error
+ ASSERT_FINISHES_OK_AND_EQ(TestInt(2), source());
+ }
+}
+
+TEST_P(SequencerTestFixture, SequenceStress) {
+ constexpr int NITEMS = 100;
+ for (auto task_index = 0; task_index < GetNumItersForStress(); task_index++)
{
+ auto input = RangeVector(NITEMS, 2);
+ RandomShuffle(input);
+ auto original = MakeSource(input);
+ auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_,
TestInt(-2));
+ AssertAsyncGeneratorMatch(RangeVector(NITEMS, 2), sequenced);
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(SequencerTests, SequencerTestFixture,
+ ::testing::Values(false, true));
Review comment:
I'm probably being dense or distracted, but I don't see the parameter
being used anywhere. Could you point it out to me? :-S
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]