lidavidm commented on a change in pull request #10076:
URL: https://github.com/apache/arrow/pull/10076#discussion_r619189262
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -480,13 +492,24 @@ Result<std::shared_ptr<Table>> AsyncScanner::ToTable() {
return table_fut.result();
}
+Result<EnumeratedRecordBatchGenerator>
AsyncScanner::ScanBatchesUnorderedAsync() {
+ return ScanBatchesUnorderedAsync(internal::GetCpuThreadPool());
+}
+
Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
internal::Executor* cpu_executor) {
auto self = shared_from_this();
ARROW_ASSIGN_OR_RAISE(auto fragment_gen, GetFragments());
ARROW_ASSIGN_OR_RAISE(auto batch_gen_gen,
FragmentsToBatches(self, std::move(fragment_gen)));
- return MakeConcatenatedGenerator(std::move(batch_gen_gen));
+ auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
+ std::move(batch_gen_gen), scan_options_->fragment_readahead);
+ return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
+ scan_options_->fragment_readahead);
+}
Review comment:
So here we're teeing up at most `fragment_readahead` active fragments,
of which we can queue up to `fragment_readahead` batches? And the net effect is
that we read from each fragment in parallel.
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -115,7 +116,7 @@ class TestScanner : public
DatasetFixtureMixinWithParam<TestScannerParams> {
AssertScanBatchesUnorderedEquals(expected.get(), scanner.get(), 1);
}
-};
+}; // namespace dataset
Review comment:
I'm not sure what happened here - clang-format added this to the class
declaration?
(N.B. How are you using clang-format? I don't think it does this to me.)
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
+ std::vector<int> last_indices(kNumFragments);
+ for (int i = 0; i < kNumFragments; i++) {
+ auto last_p = std::find(order.rbegin(), order.rend(), i);
+ EXPECT_NE(last_p, order.rend());
+ last_indices[i] = std::distance(last_p, order.rend()) - 1;
+ }
+ return last_indices;
+ }
+
+ /// We buffer one item in order to enumerate it (technically this could be
avoided if
+ /// delivering in order but easier to have a single code path). We also
can't deliver
+ /// items that don't come next. These two facts make for some pretty
complex logic
+ /// to determine when items are ready to be collected.
+ std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+ TaggedRecordBatchGenerator
gen) {
+ std::vector<TaggedRecordBatch> collected;
+ auto last_indices = GetLastIndices(order);
+ int num_fragments = last_indices.size();
+ std::vector<int> batches_seen_for_fragment(num_fragments);
+ auto current_fragment_index = 0;
+ auto seen_fragment = false;
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ batches_seen_for_fragment[fragment_index]++;
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ dataset_->FinishFragment(fragment_index);
+ }
+ if (current_fragment_index == fragment_index) {
+ if (seen_fragment) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ } else {
+ seen_fragment = true;
+ }
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ // Immediately collect your bonus fragment
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ // Now collect any batches freed up that couldn't be delivered
because they came
+ // from the wrong fragment
+ auto last_fragment_index = fragment_index;
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+ while (fragment_index < num_fragments &&
+ fragment_index != last_fragment_index) {
+ last_fragment_index = fragment_index;
+ for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1;
j++) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ if (static_cast<int>(i) >= last_indices[fragment_index]) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+ }
+ }
+ }
+ }
+ }
+ return collected;
+ }
+
+ struct FragmentStats {
+ int last_index;
+ bool seen;
+ };
+
+ std::vector<FragmentStats> GetFragmentStats(const std::vector<int>& order) {
+ auto last_indices = GetLastIndices(order);
+ std::vector<FragmentStats> fragment_stats;
+ for (std::size_t i = 0; i < last_indices.size(); i++) {
+ fragment_stats.push_back({last_indices[i], false});
+ }
+ return fragment_stats;
+ }
+
+ /// When data arrives out of order then we first have to buffer up 1 item in
order to
+ /// know when the last item has arrived (so we can mark it as the last).
This means
+ /// sometimes we deliver an item and don't get one (first in a fragment) and
sometimes
+ /// we deliver an item and we end up getting two (last in a fragment)
+ std::vector<EnumeratedRecordBatch> DeliverAndCollect(
+ std::vector<int> order, EnumeratedRecordBatchGenerator gen) {
+ std::vector<EnumeratedRecordBatch> collected;
+ auto fragment_stats = GetFragmentStats(order);
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ if (static_cast<int>(i) == fragment_stats[fragment_index].last_index) {
+ dataset_->FinishFragment(fragment_index);
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ if (!fragment_stats[fragment_index].seen) {
+ fragment_stats[fragment_index].seen = true;
+ } else {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ }
+ return collected;
+ }
+
+ std::shared_ptr<Scanner> MakeScanner(int fragment_readahead = 0) {
+ ScannerBuilder builder(dataset_);
+ // Reordering tests only make sense for async
+ builder.UseAsync(true);
+ if (fragment_readahead != 0) {
+ builder.FragmentReadahead(fragment_readahead);
+ }
+ EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish());
+ return scanner;
+ }
+
+ void AssertBatchesInOrder(const std::vector<TaggedRecordBatch>& batches,
+ std::vector<int> expected_order) {
+ ASSERT_EQ(expected_order.size(), batches.size());
+ for (std::size_t i = 0; i < batches.size(); i++) {
+ ASSERT_EQ(expected_order[i], batches[i].record_batch->num_rows());
+ }
+ }
+
+ void AssertBatchesInOrder(const std::vector<EnumeratedRecordBatch>& batches,
+ std::vector<int> expected_batch_indices,
+ std::vector<int> expected_row_sizes) {
+ ASSERT_EQ(expected_batch_indices.size(), batches.size());
+ for (std::size_t i = 0; i < batches.size(); i++) {
+ ASSERT_EQ(expected_row_sizes[i],
batches[i].record_batch.value->num_rows());
+ ASSERT_EQ(expected_batch_indices[i], batches[i].record_batch.index);
+ }
+ }
+
+ std::shared_ptr<ControlledDataset> dataset_;
+};
+
+TEST_F(TestReordering, ScanBatches) {
+ auto scanner = MakeScanner();
+ ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesAsync());
+ auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+ AssertBatchesInOrder(collected, {0, 1, 4, 2, 3});
+}
+
+TEST_F(TestReordering, ScanBatchesUnordered) {
+ auto scanner = MakeScanner();
+ ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+ auto collected = DeliverAndCollect({0, 0, 1, 1, 0}, std::move(batch_gen));
+ AssertBatchesInOrder(collected, {0, 0, 1, 1, 2}, {0, 2, 3, 1, 4});
+}
+
+struct BatchConsumer {
+ explicit BatchConsumer(EnumeratedRecordBatchGenerator generator)
+ : generator(generator), next() {}
+
+ void AssertCanConsume() {
+ if (!next.is_valid()) {
+ next = generator();
+ }
+ ASSERT_FINISHES_OK(next);
+ next = Future<EnumeratedRecordBatch>();
+ }
+
+ void AssertCannotConsume() {
+ if (!next.is_valid()) {
+ next = generator();
+ }
+ SleepABit();
+ ASSERT_FALSE(next.is_finished());
+ }
+
+ void AssertFinished() {
+ if (!next.is_valid()) {
+ next = generator();
+ }
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto last, next);
+ ASSERT_TRUE(IsIterationEnd(last));
+ }
+
+ EnumeratedRecordBatchGenerator generator;
+ Future<EnumeratedRecordBatch> next;
+};
+
+TEST_F(TestReordering, FileReadahead) {
+ auto scanner = MakeScanner(1);
+ ASSERT_OK_AND_ASSIGN(auto batch_gen, scanner->ScanBatchesUnorderedAsync());
+ BatchConsumer consumer(std::move(batch_gen));
+ dataset_->DeliverBatch(0, 0);
+ dataset_->DeliverBatch(0, 1);
+ consumer.AssertCanConsume();
+ consumer.AssertCannotConsume();
+ dataset_->DeliverBatch(1, 0);
+ consumer.AssertCannotConsume();
+ dataset_->FinishFragment(1);
Review comment:
Since this is an unordered scan, why can't we deliver the last batch for
fragment 1 at this point?
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
+ std::vector<int> last_indices(kNumFragments);
+ for (int i = 0; i < kNumFragments; i++) {
+ auto last_p = std::find(order.rbegin(), order.rend(), i);
+ EXPECT_NE(last_p, order.rend());
+ last_indices[i] = std::distance(last_p, order.rend()) - 1;
+ }
+ return last_indices;
+ }
+
+ /// We buffer one item in order to enumerate it (technically this could be
avoided if
+ /// delivering in order but easier to have a single code path). We also
can't deliver
+ /// items that don't come next. These two facts make for some pretty
complex logic
+ /// to determine when items are ready to be collected.
+ std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+ TaggedRecordBatchGenerator
gen) {
+ std::vector<TaggedRecordBatch> collected;
+ auto last_indices = GetLastIndices(order);
+ int num_fragments = last_indices.size();
+ std::vector<int> batches_seen_for_fragment(num_fragments);
+ auto current_fragment_index = 0;
+ auto seen_fragment = false;
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ batches_seen_for_fragment[fragment_index]++;
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ dataset_->FinishFragment(fragment_index);
+ }
+ if (current_fragment_index == fragment_index) {
+ if (seen_fragment) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ } else {
+ seen_fragment = true;
+ }
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ // Immediately collect your bonus fragment
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ // Now collect any batches freed up that couldn't be delivered
because they came
+ // from the wrong fragment
+ auto last_fragment_index = fragment_index;
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+ while (fragment_index < num_fragments &&
+ fragment_index != last_fragment_index) {
+ last_fragment_index = fragment_index;
+ for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1;
j++) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ if (static_cast<int>(i) >= last_indices[fragment_index]) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
Review comment:
(This is all to check my understanding, this is somewhat tricky…)
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
+ std::vector<int> last_indices(kNumFragments);
+ for (int i = 0; i < kNumFragments; i++) {
+ auto last_p = std::find(order.rbegin(), order.rend(), i);
+ EXPECT_NE(last_p, order.rend());
+ last_indices[i] = std::distance(last_p, order.rend()) - 1;
+ }
+ return last_indices;
+ }
+
+ /// We buffer one item in order to enumerate it (technically this could be
avoided if
+ /// delivering in order but easier to have a single code path). We also
can't deliver
+ /// items that don't come next. These two facts make for some pretty
complex logic
+ /// to determine when items are ready to be collected.
+ std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+ TaggedRecordBatchGenerator
gen) {
+ std::vector<TaggedRecordBatch> collected;
+ auto last_indices = GetLastIndices(order);
+ int num_fragments = last_indices.size();
+ std::vector<int> batches_seen_for_fragment(num_fragments);
+ auto current_fragment_index = 0;
+ auto seen_fragment = false;
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ batches_seen_for_fragment[fragment_index]++;
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ dataset_->FinishFragment(fragment_index);
+ }
+ if (current_fragment_index == fragment_index) {
+ if (seen_fragment) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ } else {
+ seen_fragment = true;
+ }
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ // Immediately collect your bonus fragment
Review comment:
nit: bonus record batch?
also nit: as explained below, this is because of the buffering?
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
+ std::vector<int> last_indices(kNumFragments);
+ for (int i = 0; i < kNumFragments; i++) {
+ auto last_p = std::find(order.rbegin(), order.rend(), i);
+ EXPECT_NE(last_p, order.rend());
+ last_indices[i] = std::distance(last_p, order.rend()) - 1;
+ }
+ return last_indices;
+ }
+
+ /// We buffer one item in order to enumerate it (technically this could be
avoided if
+ /// delivering in order but easier to have a single code path). We also
can't deliver
+ /// items that don't come next. These two facts make for some pretty
complex logic
+ /// to determine when items are ready to be collected.
+ std::vector<TaggedRecordBatch> DeliverAndCollect(std::vector<int> order,
+ TaggedRecordBatchGenerator
gen) {
+ std::vector<TaggedRecordBatch> collected;
+ auto last_indices = GetLastIndices(order);
+ int num_fragments = last_indices.size();
+ std::vector<int> batches_seen_for_fragment(num_fragments);
+ auto current_fragment_index = 0;
+ auto seen_fragment = false;
+ for (std::size_t i = 0; i < order.size(); i++) {
+ auto fragment_index = order[i];
+ dataset_->DeliverBatch(fragment_index, i);
+ batches_seen_for_fragment[fragment_index]++;
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ dataset_->FinishFragment(fragment_index);
+ }
+ if (current_fragment_index == fragment_index) {
+ if (seen_fragment) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ } else {
+ seen_fragment = true;
+ }
+ if (static_cast<int>(i) == last_indices[fragment_index]) {
+ // Immediately collect your bonus fragment
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ // Now collect any batches freed up that couldn't be delivered
because they came
+ // from the wrong fragment
+ auto last_fragment_index = fragment_index;
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
+ while (fragment_index < num_fragments &&
+ fragment_index != last_fragment_index) {
+ last_fragment_index = fragment_index;
+ for (int j = 0; j < batches_seen_for_fragment[fragment_index] - 1;
j++) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ }
+ if (static_cast<int>(i) >= last_indices[fragment_index]) {
+ EXPECT_FINISHES_OK_AND_ASSIGN(auto next, gen());
+ collected.push_back(std::move(next));
+ fragment_index++;
+ seen_fragment = batches_seen_for_fragment[fragment_index] > 0;
Review comment:
So `seen_fragment` is basically, 'have we delivered a batch to this
fragment before' (since we can only expect to pull a batch on the 2nd delivery)?
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
Review comment:
Ah though the usage below makes it clearer.
##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -390,6 +391,284 @@ TEST_P(TestScanner, Head) {
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
::testing::ValuesIn(TestScannerParams::Values()));
+/// These ControlledXyz classes allow for controlling the order in which
things are
+/// delivered so that we can test out of order resequencing. The dataset
allows
+/// batches to be delivered on any fragment. When delivering batches a
num_rows
+/// parameter is taken which can be used to differentiate batches.
+class ControlledFragment : public Fragment {
+ public:
+ ControlledFragment(std::shared_ptr<Schema> schema)
+ : Fragment(literal(true), std::move(schema)) {}
+
+ Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override
{
+ return Status::NotImplemented(
+ "Not needed for testing. Sync can only return things in-order.");
+ }
+ Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+ return physical_schema_;
+ }
+ std::string type_name() const override { return
"scanner_test.cc::ControlledFragment"; }
+
+ Result<RecordBatchGenerator> ScanBatchesAsync(
+ const std::shared_ptr<ScanOptions>& options) override {
+ return record_batch_generator_;
+ };
+
+ void Finish() { ARROW_UNUSED(record_batch_generator_.producer().Close()); }
+ void DeliverBatch(uint32_t num_rows) {
+ auto batch = ConstantArrayGenerator::Zeroes(num_rows, physical_schema_);
+ record_batch_generator_.producer().Push(std::move(batch));
+ }
+
+ private:
+ PushGenerator<std::shared_ptr<RecordBatch>> record_batch_generator_;
+};
+
+// TODO(ARROW-8163) Add testing for fragments arriving out of order
+class ControlledDataset : public Dataset {
+ public:
+ explicit ControlledDataset(int num_fragments)
+ : Dataset(arrow::schema({field("i32", int32())})), fragments_() {
+ for (int i = 0; i < num_fragments; i++) {
+ fragments_.push_back(std::make_shared<ControlledFragment>(schema_));
+ }
+ }
+
+ std::string type_name() const override { return
"scanner_test.cc::ControlledDataset"; }
+ Result<std::shared_ptr<Dataset>> ReplaceSchema(
+ std::shared_ptr<Schema> schema) const override {
+ return Status::NotImplemented("Should not be called by unit test");
+ }
+
+ void DeliverBatch(int fragment_index, int num_rows) {
+ fragments_[fragment_index]->DeliverBatch(num_rows);
+ }
+
+ void FinishFragment(int fragment_index) {
fragments_[fragment_index]->Finish(); }
+
+ protected:
+ virtual Result<FragmentIterator> GetFragmentsImpl(Expression predicate) {
+ std::vector<std::shared_ptr<Fragment>> casted_fragments(fragments_.begin(),
+ fragments_.end());
+ return MakeVectorIterator(std::move(casted_fragments));
+ }
+
+ private:
+ std::vector<std::shared_ptr<ControlledFragment>> fragments_;
+};
+
+constexpr int kNumFragments = 2;
+
+class TestReordering : public ::testing::Test {
+ public:
+ void SetUp() override { dataset_ =
std::make_shared<ControlledDataset>(kNumFragments); }
+
+ // Given a vector of fragment indices (one per batch) return a vector
+ // (one per fragment) mapping fragment index to the last occurrence of that
+ // index in order
+ //
+ // This allows us to know when to mark a fragment as finished
+ std::vector<int> GetLastIndices(const std::vector<int>& order) {
Review comment:
I had to read this a few times to figure out what was going on. Given a
vector which maps the Nth batch delivered to the Kth fragment, we get a vector
mapping each fragment to the index of the last batch it delivered?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]