This is an automated email from the ASF dual-hosted git repository. rok pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push: new 021d8abea6 GH-26818: [C++][Python] Preserve order when writing dataset multi-threaded (#44470) 021d8abea6 is described below commit 021d8abea6f1d449039402df4791f6dfd37be9b6 Author: Enrico Minack <git...@enrico.minack.dev> AuthorDate: Wed May 14 09:30:54 2025 +0200 GH-26818: [C++][Python] Preserve order when writing dataset multi-threaded (#44470) ### Rationale for this change The order of rows in a dataset might be important for users and should be preserved when writing to a filesystem. With multi-threaded write, the order is currently not guaranteed, ### What changes are included in this PR? Preserving the dataset order of rows requires the `SourceNode` to sequence the fragments output (this keeps exec batches in the order of fragments), to provide an `ImplicitOrdering` (this gives exec batches an index), and the `ConsumingSinkNode` to sequence exec batches (finally preserve order of batches according to their index). User-facing changes: - Add option `preserve_order` to `FileSystemDatasetWriteOptions` (C++) and `arrow.dataset.write_dataset` (Python). Default behaviour is current behaviour. ### Are these changes tested? Unit tests have been added, ### Are there any user-facing changes? Users can set `FileSystemDatasetWriteOptions.preserve_order = true` (C++) / `arrow.dataset.write_dataset(..., preserve_order=True)` (Python). * GitHub Issue: #26818 Lead-authored-by: Enrico Minack <git...@enrico.minack.dev> Co-authored-by: Rok Mihevc <r...@mihevc.org> Signed-off-by: Rok Mihevc <r...@mihevc.org> --- cpp/src/arrow/dataset/file_base.cc | 14 ++- cpp/src/arrow/dataset/file_base.h | 4 + cpp/src/arrow/dataset/file_test.cc | 167 +++++++++++++++++++++++++++ python/pyarrow/_dataset.pyx | 2 + python/pyarrow/dataset.py | 18 ++- python/pyarrow/includes/libarrow_dataset.pxd | 1 + python/pyarrow/tests/test_dataset.py | 16 +++ 7 files changed, 214 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index b084581b1a..61f7c5e732 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -473,9 +473,14 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio WriteNodeOptions write_node_options(write_options); write_node_options.custom_schema = custom_schema; + // preserve existing order across fragments by setting require_sequenced_output=true + bool require_sequenced_output = write_node_options.write_options.preserve_order; + // preserve existing order of sequenced scan output by setting implicit_order=true + bool implicit_ordering = write_node_options.write_options.preserve_order; acero::Declaration plan = acero::Declaration::Sequence({ - {"scan", ScanNodeOptions{dataset, scanner->options()}}, + {"scan", ScanNodeOptions{dataset, scanner->options(), require_sequenced_output, + implicit_ordering}}, {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, {"write", std::move(write_node_options)}, @@ -541,8 +546,13 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* plan, ARROW_ASSIGN_OR_RAISE( auto node, + // to preserve order explicitly, sequence the exec batches + // this requires exec batch index to be set upstream (e.g. by SourceNode) acero::MakeExecNode("consuming_sink", plan, std::move(inputs), - acero::ConsumingSinkNodeOptions{std::move(consumer)})); + acero::ConsumingSinkNodeOptions{ + std::move(consumer), + {}, + /*sequence_output=*/write_options.preserve_order})); return node; } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 46fc8ebc40..e13c1312a4 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -399,6 +399,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Partitioning used to generate fragment paths. std::shared_ptr<Partitioning> partitioning; + /// If true the order of rows in the dataset is preserved when writing with + /// multiple threads. This may cause notable performance degradation. + bool preserve_order = false; + /// Maximum number of partitions any batch may be written into, default is 1K. int max_partitions = 1024; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 5d6068557f..8904531200 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -15,15 +15,22 @@ // specific language governing permissions and limitations // under the License. +#include <arrow/compute/function.h> +#include <arrow/compute/registry.h> #include <cstdint> #include <memory> #include <string> +#include <thread> #include <tuple> #include <vector> #include <gmock/gmock.h> #include <gtest/gtest.h> +#include <arrow/dataset/dataset.h> +#include <arrow/dataset/file_base.h> +#include <arrow/record_batch.h> +#include <arrow/util/async_generator.h> #include "arrow/acero/exec_plan.h" #include "arrow/acero/test_util_internal.h" #include "arrow/array/array_primitive.h" @@ -31,6 +38,7 @@ #include "arrow/dataset/api.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/plan.h" +#include "arrow/dataset/projector.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/filesystem/path_util.h" #include "arrow/filesystem/test_util.h" @@ -353,6 +361,165 @@ TEST_F(TestFileSystemDataset, WriteProjected) { } } +// This kernel delays execution for some specific scalar values, +// which guarantees the writing phase sees out-of-order exec batches +Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch, + compute::ExecResult* out) { + const ArraySpan& input = batch[0].array; + const auto* input_values = input.GetValues<uint32_t>(1); + uint8_t* output_values = out->array_span()->buffers[1].data; + + // Boolean data is stored in 1 bit per value + for (int64_t i = 0; i < input.length; ++i) { + if (input_values[i] % 16 == 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + bit_util::SetBitTo(output_values, i, true); + } + + return Status::OK(); +} + +// A fragment with start=0 will defer ScanBatchesAsync returning a batch generator +// This guarantees a dataset of multiple fragments could produce out-of-order batches +class MockFragment : public Fragment { + public: + explicit MockFragment(uint32_t start, int64_t rows_per_batch, int num_batches, + const std::shared_ptr<Schema>& schema) + : Fragment(compute::literal(true), schema), + start_(start), + rows_per_batch_(rows_per_batch), + num_batches_(num_batches) {} + + Result<RecordBatchGenerator> ScanBatchesAsync( + const std::shared_ptr<ScanOptions>& options) override { + // Fragment with start_=0 defers returning the generator + if (start_ == 0) { + std::this_thread::sleep_for(std::chrono::duration<double>(0.1)); + } + + auto vec = gen::Gen({gen::Step(start_)}) + ->FailOnError() + ->RecordBatches(rows_per_batch_, num_batches_); + auto it = MakeVectorIterator(vec); + return MakeBackgroundGenerator(std::move(it), io::default_io_context().executor()); + } + + std::string type_name() const override { return "mock"; } + + protected: + Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override { + return given_physical_schema_; + }; + + private: + uint32_t start_; + int64_t rows_per_batch_; + int num_batches_; +}; + +// This dataset consists of multiple fragments with incrementing values across the +// fragments +class MockDataset : public Dataset { + public: + explicit MockDataset(const std::shared_ptr<Schema>& schema) : Dataset(schema) {} + + MockDataset(const std::shared_ptr<Schema>& schema, + const compute::Expression& partition_expression) + : Dataset(schema, partition_expression) {} + + std::string type_name() const override { return "mock"; } + Result<std::shared_ptr<Dataset>> ReplaceSchema( + std::shared_ptr<Schema> schema) const override { + RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); + return std::make_shared<MockDataset>(std::move(schema)); + } + + protected: + Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override { + FragmentVector fragments; + fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_)); + fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, schema_)); + return MakeVectorIterator(std::move(fragments)); + }; +}; + +TEST_F(TestFileSystemDataset, MultiThreadedWritePersistsOrder) { + // Test for GH-26818 + // + // This test uses std::this_thread::sleep_for to increase chances for batches + // to get written out-of-order in multi-threaded environment. + // With preserve_order = false, the existence of out-of-order is asserted to + // verify that the test setup reliably writes out-of-order sequences, and + // that write_options.preserve_order = preserve_order can recreate order. + // + // Estimates for out_of_order == false and preserve_order == false to occur + // are 10^-62 https://github.com/apache/arrow/pull/44470#discussion_r2079049038 + // + // If this test starts to reliably fail with preserve_order == false, the test setup + // has to be revised to again reliably produce out-of-order sequences. + auto format = std::make_shared<IpcFileFormat>(); + FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = format->DefaultWriteOptions(); + write_options.base_dir = "root"; + write_options.partitioning = std::make_shared<HivePartitioning>(schema({})); + write_options.basename_template = "{i}.feather"; + + // The Mock dataset delays emitting the first fragment, which test sequenced output of + // scan node + auto dataset = std::make_shared<MockDataset>(schema({field("f0", int32())})); + + // The delay scalar function delays some batches of all fragments, which tests implicit + // ordering + auto delay_func = std::make_shared<compute::ScalarFunction>("delay", compute::Arity(1), + compute::FunctionDoc()); + compute::ScalarKernel delay_kernel; + delay_kernel.exec = delay; + delay_kernel.signature = compute::KernelSignature::Make({int32()}, boolean()); + ASSERT_OK(delay_func->AddKernel(delay_kernel)); + ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func)); + + for (bool preserve_order : {true, false}) { + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK(scanner_builder->UseThreads(true)); + ASSERT_OK( + scanner_builder->Filter(compute::call("delay", {compute::field_ref("f0")}))); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime); + write_options.filesystem = fs; + write_options.preserve_order = preserve_order; + + ASSERT_OK(FileSystemDataset::Write(write_options, scanner)); + + // Read the file back out and verify the order + ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make( + fs, {"root/0.feather"}, format, {})); + ASSERT_OK_AND_ASSIGN(auto written_dataset, dataset_factory->Finish(FinishOptions{})); + ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan()); + ASSERT_OK(scanner_builder->UseThreads(false)); + ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish()); + ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable()); + TableBatchReader reader(*actual); + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(reader.ReadNext(&batch)); + int32_t prev = -1; + auto out_of_order = false; + while (batch != nullptr) { + const auto* values = batch->column(0)->data()->GetValues<int32_t>(1); + for (int row = 0; row < batch->num_rows(); ++row) { + int32_t value = values[row]; + if (value <= prev) { + out_of_order = true; + } + prev = value; + } + ASSERT_OK(reader.ReadNext(&batch)); + } + ASSERT_EQ(!out_of_order, preserve_order); + } +} + class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, bool>> { using PlanFactory = std::function<std::vector<acero::Declaration>( const FileSystemDatasetWriteOptions&, diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index d31b02b088..9e5edee574 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -4090,6 +4090,7 @@ def _filesystemdataset_write( str basename_template not None, FileSystem filesystem not None, Partitioning partitioning not None, + bool preserve_order, FileWriteOptions file_options not None, int max_partitions, object file_visitor, @@ -4112,6 +4113,7 @@ def _filesystemdataset_write( c_options.filesystem = filesystem.unwrap() c_options.base_dir = tobytes(_stringify_path(base_dir)) c_options.partitioning = partitioning.unwrap() + c_options.preserve_order = preserve_order c_options.max_partitions = max_partitions c_options.max_open_files = max_open_files c_options.max_rows_per_file = max_rows_per_file diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 03459a6c49..26602c1e17 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -844,9 +844,9 @@ def _ensure_write_partitioning(part, schema, flavor): def write_dataset(data, base_dir, *, basename_template=None, format=None, - partitioning=None, partitioning_flavor=None, schema=None, - filesystem=None, file_options=None, use_threads=True, - max_partitions=None, max_open_files=None, + partitioning=None, partitioning_flavor=None, + schema=None, filesystem=None, file_options=None, use_threads=True, + preserve_order=False, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, file_visitor=None, existing_data_behavior='error', create_dir=True): @@ -889,7 +889,13 @@ Table/RecordBatch, or iterable of RecordBatch ``FileFormat.make_write_options()`` function. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be - used determined by the number of available CPU cores. + used determined by the number of available CPU cores. Using multiple + threads may change the order of rows in the written dataset if + preserve_order is set to False. + preserve_order : bool, default False + Preserve the order of rows. If enabled, order of rows in the dataset are + guaranteed to be preserved even if use_threads is set to True. This may + cause notable performance degradation. max_partitions : int, default 1024 Maximum number of partitions any batch may be written into. max_open_files : int, default 1024 @@ -1028,7 +1034,7 @@ Table/RecordBatch, or iterable of RecordBatch _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions, file_visitor, existing_data_behavior, - max_open_files, max_rows_per_file, + preserve_order, file_options, max_partitions, file_visitor, + existing_data_behavior, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, create_dir ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index ab7355f8d3..8e3fb9590f 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -221,6 +221,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CFileSystem] filesystem c_string base_dir shared_ptr[CPartitioning] partitioning + c_bool preserve_order int max_partitions c_string basename_template function[cb_writer_finish_internal] writer_pre_finish diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index cd31a62f62..d95c657c99 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -4590,6 +4590,22 @@ def test_write_dataset_use_threads(tempdir): assert result1.to_table().equals(result2.to_table()) +@pytest.mark.parquet +@pytest.mark.pandas +def test_write_dataset_use_threads_preserve_order(tempdir): + # see GH-26818 + table = pa.table({"a": range(1024)}) + batches = table.to_batches(max_chunksize=2) + ds.write_dataset(batches, tempdir, format="parquet", + use_threads=True, preserve_order=True) + seq = ds.dataset(tempdir).to_table(use_threads=False)['a'].to_numpy() + prev = -1 + for item in seq: + curr = int(item) + assert curr > prev, f"Sequence expected to be ordered: {seq}" + prev = curr + + def test_write_table(tempdir): table = pa.table([ pa.array(range(20)), pa.array(random.random() for _ in range(20)),