lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r620225452



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which 
things are
+/// delivered so that we can test out of order resequencing.  The dataset 
allows
+/// batches to be delivered on any fragment.  When delivering batches a 
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override 
{
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return 
"scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return 
"scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { 
fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = 
std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {
+    std::vector<int> last_indices(kNumFragments);
+    for (int i = 0; i < kNumFragments; i++) {
+      auto last_p = std::find(order.rbegin(), order.rend(), i);
+      EXPECT_NE(last_p, order.rend());
+      last_indices[i] = std::distance(last_p, order.rend()) - 1;
+    }
+    return last_indices;
+  }
+
+  /// We buffer one item in order to enumerate it (technically this could be 
avoided if
+  /// delivering in order but easier to have a single code path).  We also 
can't deliver
+  /// items that don't come next.  These two facts make for some pretty 
complex logic
+  /// to determine when items are ready to be collected.
+  std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+                                                   TaggedRecordBatchGenerator 
gen) {
+    std::vector<TaggedRecordBatch> collected;
+    auto last_indices = GetLastIndices(order);
+    int num_fragments = last_indices.size();
+    std::vector<int> batches_seen_for_fragment(num_fragments);
+    auto current_fragment_index = 0;
+    auto seen_fragment = false;
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      batches_seen_for_fragment[fragment_index]++;
+      if (static_cast<int>(i) == last_indices[fragment_index]) {
+        dataset_->FinishFragment(fragment_index);
+      }
+      if (current_fragment_index == fragment_index) {
+        if (seen_fragment) {
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+        } else {
+          seen_fragment = true;
+        }
+        if (static_cast<int>(i) == last_indices[fragment_index]) {
+          // Immediately collect your bonus fragment
+          EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+          collected.push_back(std::move(next));
+          // Now collect any batches freed up that couldn't be delivered 
because they came
+          // from the wrong fragment
+          auto last_fragment_index = fragment_index;
+          fragment_index++;
+          seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+          while (fragment_index < num_fragments &&
+                 fragment_index != last_fragment_index) {
+            last_fragment_index = fragment_index;
+            for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1; 
j++) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+            }
+            if (static_cast<int>(i) >= last_indices[fragment_index]) {
+              EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+              collected.push_back(std::move(next));
+              fragment_index++;
+              seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+            }
+          }
+        }
+      }
+    }
+    return collected;
+  }
+
+  struct FragmentStats {
+    int last_index;
+    bool seen;
+  };
+
+  std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+    auto last_indices = GetLastIndices(order);
+    std::vector<FragmentStats> fragment_stats;
+    for (std::size_t i = 0; i < last_indices.size(); i++) {
+      fragment_stats.push_back({last_indices[i], false});
+    }
+    return fragment_stats;
+  }
+
+  /// When data arrives out of order then we first have to buffer up 1 item in 
order to
+  /// know when the last item has arrived (so we can mark it as the last).  
This means
+  /// sometimes we deliver an item and don't get one (first in a fragment) and 
sometimes
+  /// we deliver an item and we end up getting two (last in a fragment)
+  std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+      std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+    std::vector<EnumeratedRecordBatch> collected;
+    auto fragment_stats = GetFragmentStats(order);
+    for (std::size_t i = 0; i < order.size(); i++) {
+      auto fragment_index = order[i];
+      dataset_->DeliverBatch(fragment_index, i);
+      if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+        dataset_->FinishFragment(fragment_index);
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+      if (!fragment_stats[fragment_index].seen) {
+        fragment_stats[fragment_index].seen = true;
+      } else {
+        EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+        collected.push_back(std::move(next));
+      }
+    }
+    return collected;
+  }
+
+  std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+    ScannerBuilder builder(dataset_);
+    // Reordering tests only make sense for async
+    builder.UseAsync(true);
+    if (fragment_readahead != 0) {
+      builder.FragmentReadahead(fragment_readahead);
+    }
+    EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+    return scanner;
+  }
+
+  void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+                            std::vector<int> expected_order) {
+    ASSERT_EQ(expected_order.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+    }
+  }
+
+  void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+                            std::vector<int> expected_batch_indices,
+                            std::vector<int> expected_row_sizes) {
+    ASSERT_EQ(expected_batch_indices.size(), batches.size());
+    for (std::size_t i = 0; i < batches.size(); i++) {
+      ASSERT_EQ(expected_row_sizes[i], 
batches[i].record_batch.value->num_rows());
+      ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+    }
+  }
+
+  std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+  auto scanner = MakeScanner();
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+  AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+  explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+      : generator(generator), next() {}
+
+  void AssertCanConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK(next);
+    next = Future<EnumeratedRecordBatch>();
+  }
+
+  void AssertCannotConsume() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    SleepABit();
+    ASSERT_FALSE(next.is_finished());
+  }
+
+  void AssertFinished() {
+    if (!next.is_valid()) {
+      next = generator();
+    }
+    ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+    ASSERT_TRUE(IsIterationEnd(last));
+  }
+
+  EnumeratedRecordBatchGenerator generator;
+  Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+  auto scanner = MakeScanner(1);
+  ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+  BatchConsumer consumer(std::move(batch_gen));
+  dataset_->DeliverBatch(0, 0);
+  dataset_->DeliverBatch(0, 1);
+  consumer.AssertCanConsume();
+  consumer.AssertCannotConsume();
+  dataset_->DeliverBatch(1, 0);
+  consumer.AssertCannotConsume();
+  dataset_->FinishFragment(1);

Review comment:
       It might be nice to explain inline why we expect to get/not get a batch, 
or at least put something like `MakeScanner(/*fragment_readahead=*/1)` to call 
out the readahead level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to