rok commented on code in PR #44470: URL: https://github.com/apache/arrow/pull/44470#discussion_r2081533974
########## cpp/src/arrow/dataset/file_test.cc: ########## @@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// This kernel delays execution for some specific scalar values, +// which guarantees the writing phase sees out-of-order exec batches +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const auto* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + bit_util::SetBitTo(output_values, i, true); + } + + return Status::OK(); +} + +// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator +// This guarantees a dataset of multiple fragments produces out-of-order batches +class MockFragment : public Fragment { + public: + explicit MockFragment(uint32_t start, int64_t rows_per_batch, int num_batches, + const std::shared_ptr<Schema>& schema) + : Fragment(compute::literal(true), schema), + start_(start), + rows_per_batch_(rows_per_batch), + num_batches_(num_batches) {} + + Result<RecordBatchGenerator> ScanBatchesAsync( + const std::shared_ptr<ScanOptions>& options) override { + // One fragment requires some more time than others + if (start_ == 0) { + std::this_thread::sleep_for(std::chrono::duration<double>(0.1)); + } + + auto vec = gen::Gen({gen::Step(start_)}) + ->FailOnError() + ->RecordBatches(rows_per_batch_, num_batches_); + auto it = MakeVectorIterator(vec); + return MakeBackgroundGenerator(std::move(it), io::default_io_context().executor()); + } + + std::string type_name() const override { return "mock"; } + + protected: + Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override { + return given_physical_schema_; + }; + + private: + uint32_t start_; + int64_t rows_per_batch_; + int num_batches_; +}; + +// This dataset consists of multiple fragments with incrementing values across the +// fragments +class MockDataset : public Dataset { + public: + explicit MockDataset(const std::shared_ptr<Schema>& schema) : Dataset(schema) {} + + MockDataset(const std::shared_ptr<Schema>& schema, + const compute::Expression& partition_expression) + : Dataset(schema, partition_expression) {} + + std::string type_name() const override { return "mock"; } + Result<std::shared_ptr<Dataset>> ReplaceSchema( + std::shared_ptr<Schema> schema) const override { + RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); + return std::make_shared<MockDataset>(std::move(schema)); + } + + protected: + Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override { + FragmentVector fragments; + fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_)); + fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, schema_)); + return MakeVectorIterator(std::move(fragments)); + }; +}; + +TEST_F(TestFileSystemDataset, WritePersistOrder) { + // Test for GH-26818 + auto format = std::make_shared<IpcFileFormat>(); + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = format->DefaultWriteOptions(); + write_options.base_dir = "root"; + write_options.partitioning = std::make_shared<HivePartitioning>(schema({})); + write_options.basename_template = "{i}.feather"; + + // The Mock dataset delays emitting the first fragment, which test sequenced output of + // scan node + auto dataset = std::make_shared<MockDataset>(schema({field("f0", int32())})); + + // The delay scalar function delays some batches of all fragments, which tests implicit + // ordering + auto delay_func = std::make_shared<compute::ScalarFunction>("delay", compute::Arity(1), + compute::FunctionDoc()); + compute::ScalarKernel delay_kernel; + delay_kernel.exec = delay; + delay_kernel.signature = compute::KernelSignature::Make({int32()}, boolean()); + ASSERT_OK(delay_func->AddKernel(delay_kernel)); + ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func)); + + for (bool preserve_order : {true, false}) { + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK(scanner_builder->UseThreads(true)); + ASSERT_OK( + scanner_builder->Filter(compute::call("delay", {compute::field_ref("f0")}))); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime); + write_options.filesystem = fs; + write_options.preserve_order = preserve_order; + + ASSERT_OK(FileSystemDataset::Write(write_options, scanner)); + + // Read the file back out and verify the order + ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make( + fs, {"root/0.feather"}, format, {})); + ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{})); + ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan()); + ASSERT_OK(scanner_builder->UseThreads(false)); + ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable()); + TableBatchReader reader(*actual); + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(reader.ReadNext(&batch)); + int32_t prev = -1; + auto out_of_order = false; + while (batch != nullptr) { + const auto* values = batch->column(0)->data()->GetValues<int32_t>(1); + for (int row = 0; row < batch->num_rows(); ++row) { + int32_t value = values[row]; + if (value <= prev) { + out_of_order = true; + } + prev = value; + } + ASSERT_OK(reader.ReadNext(&batch)); + } + ASSERT_EQ(!out_of_order, preserve_order); Review Comment: If I understand @EnricoMi correctly - it doesn't hurt to actually check ooo when `preserve_order == false` as it guarantees our generated test data is ooo before sorting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org