westonpace commented on a change in pull request #9779:
URL: https://github.com/apache/arrow/pull/9779#discussion_r603407695



##########
File path: cpp/src/arrow/util/async_generator_test.cc
##########
@@ -793,6 +794,118 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   ASSERT_TRUE(IsIterationEnd(definitely_last));
 }
 
+class SequencerTestFixture : public GeneratorTestFixture {
+ protected:
+  void RandomShuffle(std::vector<TestInt>& values) {
+    std::default_random_engine gen(seed_++);
+    std::shuffle(values.begin(), values.end(), gen);
+  }
+
+  int seed_ = 42;
+  std::function<bool(const TestInt&, const TestInt&)> cmp_ =
+      [](const TestInt& left, const TestInt& right) { return left.value > 
right.value; };
+  // Let's increment by 2's to make it interesting
+  std::function<bool(const TestInt&, const TestInt&)> is_next_ =
+      [](const TestInt& left, const TestInt& right) {
+        return left.value + 2 == right.value;
+      };
+};
+
+TEST_P(SequencerTestFixture, SequenceBasic) {
+  // Basic sequencing
+  auto original = MakeSource({6, 4, 2});
+  auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, 
TestInt(0));
+  AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+
+  // From ordered input
+  original = MakeSource({2, 4, 6});
+  sequenced = MakeSequencingGenerator(original, cmp_, is_next_, TestInt(0));
+  AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+}
+
+TEST_P(SequencerTestFixture, SequenceLambda) {
+  auto cmp = [](const TestInt& left, const TestInt& right) {
+    return left.value > right.value;
+  };
+  auto is_next = [](const TestInt& left, const TestInt& right) {
+    return left.value + 2 == right.value;
+  };
+  // Basic sequencing
+  auto original = MakeSource({6, 4, 2});
+  auto sequenced = MakeSequencingGenerator(original, cmp, is_next, TestInt(0));
+  AssertAsyncGeneratorMatch({2, 4, 6}, sequenced);
+}
+
+TEST_P(SequencerTestFixture, SequenceError) {
+  {
+    auto original = MakeSource({6, 4, 2});
+    original = FailsAt(original, 1);
+    auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, 
TestInt(0));
+    auto collected = CollectAsyncGenerator(sequenced);
+    ASSERT_FINISHES_AND_RAISES(Invalid, collected);
+  }
+  {
+    // Failure should clear old items out of the queue immediately
+    // shared_ptr versions of cmp_ and is_next_
+    auto cmp = cmp_;
+    std::function<bool(const std::shared_ptr<TestInt>&, const 
std::shared_ptr<TestInt>&)>
+        ptr_cmp =
+            [cmp](const std::shared_ptr<TestInt>& left,
+                  const std::shared_ptr<TestInt>& right) { return cmp(*left, 
*right); };
+    auto is_next = is_next_;
+    std::function<bool(const std::shared_ptr<TestInt>&, const 
std::shared_ptr<TestInt>&)>
+        ptr_is_next = [is_next](const std::shared_ptr<TestInt>& left,
+                                const std::shared_ptr<TestInt>& right) {
+          return is_next(*left, *right);
+        };
+
+    PushGenerator<std::shared_ptr<TestInt>> source;
+    auto sequenced = MakeSequencingGenerator(
+        static_cast<AsyncGenerator<std::shared_ptr<TestInt>>>(source), ptr_cmp,
+        ptr_is_next, std::make_shared<TestInt>(0));
+
+    auto should_be_cleared = std::make_shared<TestInt>(4);
+    std::weak_ptr<TestInt> ref = should_be_cleared;
+    auto producer = source.producer();
+    auto next_fut = sequenced();
+    producer.Push(std::move(should_be_cleared));
+    producer.Push(Status::Invalid("XYZ"));
+    ASSERT_TRUE(ref.expired());
+
+    ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
+  }
+  {
+    // Failure should interrupt pumping
+    PushGenerator<TestInt> source;
+    auto sequenced = 
MakeSequencingGenerator(static_cast<AsyncGenerator<TestInt>>(source),
+                                             cmp_, is_next_, TestInt(0));
+
+    auto producer = source.producer();
+    auto next_fut = sequenced();
+    producer.Push(TestInt(4));
+    producer.Push(Status::Invalid("XYZ"));
+    producer.Push(TestInt(2));
+    ASSERT_FINISHES_AND_RAISES(Invalid, next_fut);
+    // The sequencer should not have pulled the 2 out of the source because it 
should
+    // have stopped pumping on error
+    ASSERT_FINISHES_OK_AND_EQ(TestInt(2), source());
+  }
+}
+
+TEST_P(SequencerTestFixture, SequenceStress) {
+  constexpr int NITEMS = 100;
+  for (auto task_index = 0; task_index < GetNumItersForStress(); task_index++) 
{
+    auto input = RangeVector(NITEMS, 2);
+    RandomShuffle(input);
+    auto original = MakeSource(input);
+    auto sequenced = MakeSequencingGenerator(original, cmp_, is_next_, 
TestInt(-2));
+    AssertAsyncGeneratorMatch(RangeVector(NITEMS, 2), sequenced);
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(SequencerTests, SequencerTestFixture,
+                         ::testing::Values(false, true));

Review comment:
       I do my best to obscure it :).  SequencerTestFixture extends 
GeneratorTestFixture which has a `MakeSource` method.  The parameter there is 
named `slow`.  `MakeSource` either returns a fast source (this is the false 
case: synchronous source, no delays) or a source that runs `SleepABit` between 
each item.  This helps test the case where the calls to `AddCallback`/`Then` 
return immediately (when this parameter is false) or not (when this parameter 
is true).  Sometimes in various generators I will get errors in one and not the 
other.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to