rok commented on code in PR #44470: URL: https://github.com/apache/arrow/pull/44470#discussion_r2078209954
########## cpp/src/arrow/dataset/file_test.cc: ########## @@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// This kernel delays execution for some specific scalar values, +// which guarantees the writing phase sees out-of-order exec batches +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const auto* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + bit_util::SetBitTo(output_values, i, true); + } + + return Status::OK(); +} + +// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator +// This guarantees a dataset of multiple fragments produces out-of-order batches +class MockFragment : public Fragment { + public: + explicit MockFragment(uint32_t start, int64_t rows_per_batch, int num_batches, + const std::shared_ptr<Schema>& schema) + : Fragment(compute::literal(true), schema), + start_(start), + rows_per_batch_(rows_per_batch), + num_batches_(num_batches) {} + + Result<RecordBatchGenerator> ScanBatchesAsync( + const std::shared_ptr<ScanOptions>& options) override { + // One fragment requires some more time than others Review Comment: Do you mean: ```suggestion // First fragment requires some more time than others ``` ########## cpp/src/arrow/dataset/file_test.cc: ########## @@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// This kernel delays execution for some specific scalar values, +// which guarantees the writing phase sees out-of-order exec batches +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const auto* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + bit_util::SetBitTo(output_values, i, true); + } + + return Status::OK(); +} + +// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator +// This guarantees a dataset of multiple fragments produces out-of-order batches Review Comment: ```suggestion // This guarantees a dataset of multiple fragments produced out-of-order batches ``` ########## cpp/src/arrow/dataset/write_node_test.cc: ########## @@ -170,5 +176,81 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) { ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata())); } +TEST_F(SimpleWriteNodeTest, SequenceOutput) { + // Test for GH-26818 + auto format = std::make_shared<IpcFileFormat>(); + constexpr int kRowsPerBatch = 16; + constexpr int kNumBatches = 32; + constexpr random::SeedType kSeed = 42; + constexpr int kJitterMod = 4; + acero::RegisterTestNodes(); + + // Create an input table + std::shared_ptr<Table> table = + gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, kNumBatches); + auto dataset = std::make_shared<InMemoryDataset>(table); + auto scan_options = std::make_shared<ScanOptions>(); + scan_options->use_threads = true; + + for (bool preserve_order : {true, false}) { + auto scanner_builder = std::make_shared<ScannerBuilder>(dataset, scan_options); + EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + auto exprs = scan_options->projection.call()->arguments; + auto names = checked_cast<const compute::MakeStructOptions*>( + scan_options->projection.call()->options.get()) + ->field_names; + + auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime); + dataset::WriteNodeOptions write_options(fs_write_options_); + write_options.write_options.file_write_options = format->DefaultWriteOptions(); + write_options.write_options.base_dir = "root"; + write_options.write_options.partitioning = + std::make_shared<HivePartitioning>(schema({})); + write_options.write_options.basename_template = "{i}.feather"; + write_options.write_options.filesystem = fs; + write_options.write_options.preserve_order = preserve_order; + + // test plan of FileSystemDataset::Write with a jitter node that guarantees exec + // batches are out of order + acero::Declaration plan = acero::Declaration::Sequence({ + {"scan", + ScanNodeOptions{dataset, scanner->options(), /*require_sequenced_output=*/false, + /*implicit_ordering=*/true}}, + {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, + {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, + {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)}, + {"write", write_options}, + }); + + ASSERT_OK(DeclarationToStatus(plan)); + + // Read the file back out and verify the order Review Comment: This block seems to repeat in `file_test.cc` and `write_node_test.cc`, perhaps we should abstract it into `test_util_internal.h`? ########## cpp/src/arrow/dataset/file_test.cc: ########## @@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// this kernel delays execution for some specific scalar values +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const uint32_t* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } Review Comment: If probabilistic, what's the likelihood of all tasks not being out of order? Something like `0.001^(input.length / 16)`? ########## cpp/src/arrow/dataset/write_node_test.cc: ########## @@ -170,5 +176,81 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) { ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata())); } +TEST_F(SimpleWriteNodeTest, SequenceOutput) { Review Comment: Maybe ```suggestion TEST_F(SimpleWriteNodeTest, MultiThreadedSequenceOutputOrdering) { ``` ? ########## python/pyarrow/dataset.py: ########## @@ -893,7 +893,12 @@ def write_dataset(data, base_dir, *, basename_template=None, format=None, ``FileFormat.make_write_options()`` function. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be - used determined by the number of available CPU cores. + used determined by the number of available CPU cores. Using multiple + threads may change the order of rows in the written dataset. Review Comment: Perhaps redundant: ```suggestion used determined by the number of available CPU cores. Using multiple threads may change the order of rows in the written dataset if preserve_order is set to False. ``` ########## cpp/src/arrow/dataset/file_test.cc: ########## @@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// This kernel delays execution for some specific scalar values, +// which guarantees the writing phase sees out-of-order exec batches +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const auto* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + bit_util::SetBitTo(output_values, i, true); + } + + return Status::OK(); +} + +// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator +// This guarantees a dataset of multiple fragments produces out-of-order batches +class MockFragment : public Fragment { + public: + explicit MockFragment(uint32_t start, int64_t rows_per_batch, int num_batches, + const std::shared_ptr<Schema>& schema) + : Fragment(compute::literal(true), schema), + start_(start), + rows_per_batch_(rows_per_batch), + num_batches_(num_batches) {} + + Result<RecordBatchGenerator> ScanBatchesAsync( + const std::shared_ptr<ScanOptions>& options) override { + // One fragment requires some more time than others + if (start_ == 0) { + std::this_thread::sleep_for(std::chrono::duration<double>(0.1)); + } + + auto vec = gen::Gen({gen::Step(start_)}) + ->FailOnError() + ->RecordBatches(rows_per_batch_, num_batches_); + auto it = MakeVectorIterator(vec); + return MakeBackgroundGenerator(std::move(it), io::default_io_context().executor()); + } + + std::string type_name() const override { return "mock"; } + + protected: + Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override { + return given_physical_schema_; + }; + + private: + uint32_t start_; + int64_t rows_per_batch_; + int num_batches_; +}; + +// This dataset consists of multiple fragments with incrementing values across the +// fragments +class MockDataset : public Dataset { + public: + explicit MockDataset(const std::shared_ptr<Schema>& schema) : Dataset(schema) {} + + MockDataset(const std::shared_ptr<Schema>& schema, + const compute::Expression& partition_expression) + : Dataset(schema, partition_expression) {} + + std::string type_name() const override { return "mock"; } + Result<std::shared_ptr<Dataset>> ReplaceSchema( + std::shared_ptr<Schema> schema) const override { + RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); + return std::make_shared<MockDataset>(std::move(schema)); + } + + protected: + Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override { + FragmentVector fragments; + fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_)); + fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, schema_)); + return MakeVectorIterator(std::move(fragments)); + }; +}; + +TEST_F(TestFileSystemDataset, WritePersistOrder) { Review Comment: Perhaps: ```suggestion TEST_F(TestFileSystemDataset, MultiThreadedWritePersistsOrder) { ``` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org