lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619486400
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
+ std::vector<int> last_indices(kNumFragments);
+ for (int i = 0; i < kNumFragments; i++) {
+ auto last_p = std::find(order.rbegin(), order.rend(), i);
+ EXPECT_NE(last_p, order.rend());
+ last_indices[i] = std::distance(last_p, order.rend()) - 1;
+ }
+ return last_indices;
+ }
+
+ /// We buffer one item in order to enumerate it (technically this could be
avoided if
+ /// delivering in order but easier to have a single code path). We also
can't deliver
+ /// items that don't come next. These two facts make for some pretty
complex logic
+ /// to determine when items are ready to be collected.
+ std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+ TaggedRecordBatchGenerator
gen) {
+ std::vector<TaggedRecordBatch> collected;
+ auto last_indices = GetLastIndices(order);
+ int num_fragments = last_indices.size();
+ std::vector<int> batches_seen_for_fragment(num_fragments);
+ auto current_fragment_index = 0;
+ auto seen_fragment = false;
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ batches_seen_for_fragment[fragment_index]++;
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ dataset_->FinishFragment(fragment_index);
+ }
+ if (current_fragment_index == fragment_index) {
+ if (seen_fragment) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ } else {
+ seen_fragment = true;
+ }
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ // Immediately collect your bonus fragment
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ // Now collect any batches freed up that couldn't be delivered
because they came
+ // from the wrong fragment
+ auto last_fragment_index = fragment_index;
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+ while (fragment_index < num_fragments &&
+ fragment_index != last_fragment_index) {
+ last_fragment_index = fragment_index;
+ for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1;
j++) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ if (static_cast<int>(i) >= last_indices[fragment_index]) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
Review comment:
I think it's overall OK, just had to read it carefully and get all the
context paged back in. (GitHub reviews aren't the best for seeing all the other
stuff going on.) Though if that could be removed, that would also be great.
Obviously it doesn't have to hold up things here though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]