westonpace commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619478106



##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
                          ::testing::ValuesIn(TestScannerParams::Values()));
 
+/// These ControlledXyz classes allow for controlling the order in which 
things are
+/// delivered so that we can test out of order resequencing.  The dataset 
allows
+/// batches to be delivered on any fragment.  When delivering batches a 
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+  ControlledFragment(std::shared_ptr<Schema> schema)
+      : Fragment(literal(true), std::move(schema)) {}
+
+  Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override 
{
+    return Status::NotImplemented(
+        "Not needed for testing.  Sync can only return things in-order.");
+  }
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return physical_schema_;
+  }
+  std::string type_name() const override { return 
"scanner_test.cc::ControlledFragment"; }
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    return record_batch_generator_;
+  };
+
+  void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+  void DeliverBatch(uint32_t num_rows) {
+    auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+    record_batch_generator_.producer().Push(std::move(batch));
+  }
+
+ private:
+  PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+  explicit ControlledDataset(int num_fragments)
+      : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+    for (int i = 0; i < num_fragments; i++) {
+      fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+    }
+  }
+
+  std::string type_name() const override { return 
"scanner_test.cc::ControlledDataset"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    return Status::NotImplemented("Should not be called by unit test");
+  }
+
+  void DeliverBatch(int fragment_index, int num_rows) {
+    fragments_[fragment_index]->DeliverBatch(num_rows);
+  }
+
+  void FinishFragment(int fragment_index) { 
fragments_[fragment_index]->Finish(); }
+
+ protected:
+  virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+    std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+                                                            fragments_.end());
+    return MakeVectorIterator(std::move(casted_fragments));
+  }
+
+ private:
+  std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+  void SetUp() override { dataset_ = 
std::make_shared<ControlledDataset>(kNumFragments); }
+
+  // Given a vector of fragment indices (one per batch) return a vector
+  // (one per fragment) mapping fragment index to the last occurrence of that
+  // index in order
+  //
+  // This allows us to know when to mark a fragment as finished
+  std::vector<int> GetLastIndices(const std::vector<int>& order) {

Review comment:
       Your interpretation is correct.  Later one, when we've delivered the 
last batch for the fragment, we also want to close the fragment (so it delivers 
an end-token on the iterator).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to