This is an automated email from the ASF dual-hosted git repository.

rok pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 021d8abea6 GH-26818: [C++][Python] Preserve order when writing dataset 
multi-threaded (#44470)
021d8abea6 is described below

commit 021d8abea6f1d449039402df4791f6dfd37be9b6
Author: Enrico Minack <git...@enrico.minack.dev>
AuthorDate: Wed May 14 09:30:54 2025 +0200

    GH-26818: [C++][Python] Preserve order when writing dataset multi-threaded 
(#44470)
    
    ### Rationale for this change
    The order of rows in a dataset might be important for users and should be 
preserved when writing to a filesystem. With multi-threaded write, the order is 
currently not guaranteed,
    
    ### What changes are included in this PR?
    Preserving the dataset order of rows requires the `SourceNode` to sequence 
the fragments output (this keeps exec batches in the order of fragments), to 
provide an `ImplicitOrdering` (this gives exec batches an index), and the 
`ConsumingSinkNode` to sequence exec batches (finally preserve order of batches 
according to their index).
    
    User-facing changes:
    - Add option `preserve_order` to `FileSystemDatasetWriteOptions` (C++) and 
`arrow.dataset.write_dataset` (Python).
    
    Default behaviour is current behaviour.
    
    ### Are these changes tested?
    Unit tests have been added,
    
    ### Are there any user-facing changes?
    Users can set `FileSystemDatasetWriteOptions.preserve_order = true` (C++) / 
`arrow.dataset.write_dataset(..., preserve_order=True)` (Python).
    * GitHub Issue: #26818
    
    Lead-authored-by: Enrico Minack <git...@enrico.minack.dev>
    Co-authored-by: Rok Mihevc <r...@mihevc.org>
    Signed-off-by: Rok Mihevc <r...@mihevc.org>
---
 cpp/src/arrow/dataset/file_base.cc           |  14 ++-
 cpp/src/arrow/dataset/file_base.h            |   4 +
 cpp/src/arrow/dataset/file_test.cc           | 167 +++++++++++++++++++++++++++
 python/pyarrow/_dataset.pyx                  |   2 +
 python/pyarrow/dataset.py                    |  18 ++-
 python/pyarrow/includes/libarrow_dataset.pxd |   1 +
 python/pyarrow/tests/test_dataset.py         |  16 +++
 7 files changed, 214 insertions(+), 8 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_base.cc 
b/cpp/src/arrow/dataset/file_base.cc
index b084581b1a..61f7c5e732 100644
--- a/cpp/src/arrow/dataset/file_base.cc
+++ b/cpp/src/arrow/dataset/file_base.cc
@@ -473,9 +473,14 @@ Status FileSystemDataset::Write(const 
FileSystemDatasetWriteOptions& write_optio
 
   WriteNodeOptions write_node_options(write_options);
   write_node_options.custom_schema = custom_schema;
+  // preserve existing order across fragments by setting 
require_sequenced_output=true
+  bool require_sequenced_output = 
write_node_options.write_options.preserve_order;
+  // preserve existing order of sequenced scan output by setting 
implicit_order=true
+  bool implicit_ordering = write_node_options.write_options.preserve_order;
 
   acero::Declaration plan = acero::Declaration::Sequence({
-      {"scan", ScanNodeOptions{dataset, scanner->options()}},
+      {"scan", ScanNodeOptions{dataset, scanner->options(), 
require_sequenced_output,
+                               implicit_ordering}},
       {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
       {"project", acero::ProjectNodeOptions{std::move(exprs), 
std::move(names)}},
       {"write", std::move(write_node_options)},
@@ -541,8 +546,13 @@ Result<acero::ExecNode*> MakeWriteNode(acero::ExecPlan* 
plan,
 
   ARROW_ASSIGN_OR_RAISE(
       auto node,
+      // to preserve order explicitly, sequence the exec batches
+      // this requires exec batch index to be set upstream (e.g. by SourceNode)
       acero::MakeExecNode("consuming_sink", plan, std::move(inputs),
-                          
acero::ConsumingSinkNodeOptions{std::move(consumer)}));
+                          acero::ConsumingSinkNodeOptions{
+                              std::move(consumer),
+                              {},
+                              
/*sequence_output=*/write_options.preserve_order}));
 
   return node;
 }
diff --git a/cpp/src/arrow/dataset/file_base.h 
b/cpp/src/arrow/dataset/file_base.h
index 46fc8ebc40..e13c1312a4 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -399,6 +399,10 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// Partitioning used to generate fragment paths.
   std::shared_ptr<Partitioning> partitioning;
 
+  /// If true the order of rows in the dataset is preserved when writing with
+  /// multiple threads. This may cause notable performance degradation.
+  bool preserve_order = false;
+
   /// Maximum number of partitions any batch may be written into, default is 
1K.
   int max_partitions = 1024;
 
diff --git a/cpp/src/arrow/dataset/file_test.cc 
b/cpp/src/arrow/dataset/file_test.cc
index 5d6068557f..8904531200 100644
--- a/cpp/src/arrow/dataset/file_test.cc
+++ b/cpp/src/arrow/dataset/file_test.cc
@@ -15,15 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <arrow/compute/function.h>
+#include <arrow/compute/registry.h>
 #include <cstdint>
 #include <memory>
 #include <string>
+#include <thread>
 #include <tuple>
 #include <vector>
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/file_base.h>
+#include <arrow/record_batch.h>
+#include <arrow/util/async_generator.h>
 #include "arrow/acero/exec_plan.h"
 #include "arrow/acero/test_util_internal.h"
 #include "arrow/array/array_primitive.h"
@@ -31,6 +38,7 @@
 #include "arrow/dataset/api.h"
 #include "arrow/dataset/partition.h"
 #include "arrow/dataset/plan.h"
+#include "arrow/dataset/projector.h"
 #include "arrow/dataset/test_util_internal.h"
 #include "arrow/filesystem/path_util.h"
 #include "arrow/filesystem/test_util.h"
@@ -353,6 +361,165 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// This kernel delays execution for some specific scalar values,
+// which guarantees the writing phase sees out-of-order exec batches
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const auto* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    bit_util::SetBitTo(output_values, i, true);
+  }
+
+  return Status::OK();
+}
+
+// A fragment with start=0 will defer ScanBatchesAsync returning a batch 
generator
+// This guarantees a dataset of multiple fragments could produce out-of-order 
batches
+class MockFragment : public Fragment {
+ public:
+  explicit MockFragment(uint32_t start, int64_t rows_per_batch, int 
num_batches,
+                        const std::shared_ptr<Schema>& schema)
+      : Fragment(compute::literal(true), schema),
+        start_(start),
+        rows_per_batch_(rows_per_batch),
+        num_batches_(num_batches) {}
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    // Fragment with start_=0 defers returning the generator
+    if (start_ == 0) {
+      std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
+    }
+
+    auto vec = gen::Gen({gen::Step(start_)})
+                   ->FailOnError()
+                   ->RecordBatches(rows_per_batch_, num_batches_);
+    auto it = MakeVectorIterator(vec);
+    return MakeBackgroundGenerator(std::move(it), 
io::default_io_context().executor());
+  }
+
+  std::string type_name() const override { return "mock"; }
+
+ protected:
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return given_physical_schema_;
+  };
+
+ private:
+  uint32_t start_;
+  int64_t rows_per_batch_;
+  int num_batches_;
+};
+
+// This dataset consists of multiple fragments with incrementing values across 
the
+// fragments
+class MockDataset : public Dataset {
+ public:
+  explicit MockDataset(const std::shared_ptr<Schema>& schema) : 
Dataset(schema) {}
+
+  MockDataset(const std::shared_ptr<Schema>& schema,
+              const compute::Expression& partition_expression)
+      : Dataset(schema, partition_expression) {}
+
+  std::string type_name() const override { return "mock"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
+    return std::make_shared<MockDataset>(std::move(schema));
+  }
+
+ protected:
+  Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) 
override {
+    FragmentVector fragments;
+    fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_));
+    fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, 
schema_));
+    return MakeVectorIterator(std::move(fragments));
+  };
+};
+
+TEST_F(TestFileSystemDataset, MultiThreadedWritePersistsOrder) {
+  // Test for GH-26818
+  //
+  // This test uses std::this_thread::sleep_for to increase chances for batches
+  // to get written out-of-order in multi-threaded environment.
+  // With preserve_order = false, the existence of out-of-order is asserted to
+  // verify that the test setup reliably writes out-of-order sequences, and
+  // that write_options.preserve_order = preserve_order can recreate order.
+  //
+  // Estimates for out_of_order == false and preserve_order == false to occur
+  // are 10^-62 
https://github.com/apache/arrow/pull/44470#discussion_r2079049038
+  //
+  // If this test starts to reliably fail with preserve_order == false, the 
test setup
+  // has to be revised to again reliably produce out-of-order sequences.
+  auto format = std::make_shared<IpcFileFormat>();
+  FileSystemDatasetWriteOptions write_options;
+  write_options.file_write_options = format->DefaultWriteOptions();
+  write_options.base_dir = "root";
+  write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+  write_options.basename_template = "{i}.feather";
+
+  // The Mock dataset delays emitting the first fragment, which test sequenced 
output of
+  // scan node
+  auto dataset = std::make_shared<MockDataset>(schema({field("f0", int32())}));
+
+  // The delay scalar function delays some batches of all fragments, which 
tests implicit
+  // ordering
+  auto delay_func = std::make_shared<compute::ScalarFunction>("delay", 
compute::Arity(1),
+                                                              
compute::FunctionDoc());
+  compute::ScalarKernel delay_kernel;
+  delay_kernel.exec = delay;
+  delay_kernel.signature = compute::KernelSignature::Make({int32()}, 
boolean());
+  ASSERT_OK(delay_func->AddKernel(delay_kernel));
+  ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+  for (bool preserve_order : {true, false}) {
+    ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+    ASSERT_OK(scanner_builder->UseThreads(true));
+    ASSERT_OK(
+        scanner_builder->Filter(compute::call("delay", 
{compute::field_ref("f0")})));
+    ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    write_options.filesystem = fs;
+    write_options.preserve_order = preserve_order;
+
+    ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+    // Read the file back out and verify the order
+    ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+                                                   fs, {"root/0.feather"}, 
format, {}));
+    ASSERT_OK_AND_ASSIGN(auto written_dataset, 
dataset_factory->Finish(FinishOptions{}));
+    ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+    ASSERT_OK(scanner_builder->UseThreads(false));
+    ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+    ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+    TableBatchReader reader(*actual);
+    std::shared_ptr<RecordBatch> batch;
+    ASSERT_OK(reader.ReadNext(&batch));
+    int32_t prev = -1;
+    auto out_of_order = false;
+    while (batch != nullptr) {
+      const auto* values = batch->column(0)->data()->GetValues<int32_t>(1);
+      for (int row = 0; row < batch->num_rows(); ++row) {
+        int32_t value = values[row];
+        if (value <= prev) {
+          out_of_order = true;
+        }
+        prev = value;
+      }
+      ASSERT_OK(reader.ReadNext(&batch));
+    }
+    ASSERT_EQ(!out_of_order, preserve_order);
+  }
+}
+
 class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, 
bool>> {
   using PlanFactory = std::function<std::vector<acero::Declaration>(
       const FileSystemDatasetWriteOptions&,
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index d31b02b088..9e5edee574 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -4090,6 +4090,7 @@ def _filesystemdataset_write(
     str basename_template not None,
     FileSystem filesystem not None,
     Partitioning partitioning not None,
+    bool preserve_order,
     FileWriteOptions file_options not None,
     int max_partitions,
     object file_visitor,
@@ -4112,6 +4113,7 @@ def _filesystemdataset_write(
     c_options.filesystem = filesystem.unwrap()
     c_options.base_dir = tobytes(_stringify_path(base_dir))
     c_options.partitioning = partitioning.unwrap()
+    c_options.preserve_order = preserve_order
     c_options.max_partitions = max_partitions
     c_options.max_open_files = max_open_files
     c_options.max_rows_per_file = max_rows_per_file
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index 03459a6c49..26602c1e17 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -844,9 +844,9 @@ def _ensure_write_partitioning(part, schema, flavor):
 
 
 def write_dataset(data, base_dir, *, basename_template=None, format=None,
-                  partitioning=None, partitioning_flavor=None, schema=None,
-                  filesystem=None, file_options=None, use_threads=True,
-                  max_partitions=None, max_open_files=None,
+                  partitioning=None, partitioning_flavor=None,
+                  schema=None, filesystem=None, file_options=None, 
use_threads=True,
+                  preserve_order=False, max_partitions=None, 
max_open_files=None,
                   max_rows_per_file=None, min_rows_per_group=None,
                   max_rows_per_group=None, file_visitor=None,
                   existing_data_behavior='error', create_dir=True):
@@ -889,7 +889,13 @@ Table/RecordBatch, or iterable of RecordBatch
         ``FileFormat.make_write_options()`` function.
     use_threads : bool, default True
         Write files in parallel. If enabled, then maximum parallelism will be
-        used determined by the number of available CPU cores.
+        used determined by the number of available CPU cores. Using multiple
+        threads may change the order of rows in the written dataset if
+        preserve_order is set to False.
+    preserve_order : bool, default False
+        Preserve the order of rows. If enabled, order of rows in the dataset 
are
+        guaranteed to be preserved even if use_threads is set to True. This may
+        cause notable performance degradation.
     max_partitions : int, default 1024
         Maximum number of partitions any batch may be written into.
     max_open_files : int, default 1024
@@ -1028,7 +1034,7 @@ Table/RecordBatch, or iterable of RecordBatch
 
     _filesystemdataset_write(
         scanner, base_dir, basename_template, filesystem, partitioning,
-        file_options, max_partitions, file_visitor, existing_data_behavior,
-        max_open_files, max_rows_per_file,
+        preserve_order, file_options, max_partitions, file_visitor,
+        existing_data_behavior, max_open_files, max_rows_per_file,
         min_rows_per_group, max_rows_per_group, create_dir
     )
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd 
b/python/pyarrow/includes/libarrow_dataset.pxd
index ab7355f8d3..8e3fb9590f 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -221,6 +221,7 @@ cdef extern from "arrow/dataset/api.h" namespace 
"arrow::dataset" nogil:
         shared_ptr[CFileSystem] filesystem
         c_string base_dir
         shared_ptr[CPartitioning] partitioning
+        c_bool preserve_order
         int max_partitions
         c_string basename_template
         function[cb_writer_finish_internal] writer_pre_finish
diff --git a/python/pyarrow/tests/test_dataset.py 
b/python/pyarrow/tests/test_dataset.py
index cd31a62f62..d95c657c99 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -4590,6 +4590,22 @@ def test_write_dataset_use_threads(tempdir):
     assert result1.to_table().equals(result2.to_table())
 
 
+@pytest.mark.parquet
+@pytest.mark.pandas
+def test_write_dataset_use_threads_preserve_order(tempdir):
+    # see GH-26818
+    table = pa.table({"a": range(1024)})
+    batches = table.to_batches(max_chunksize=2)
+    ds.write_dataset(batches, tempdir, format="parquet",
+                     use_threads=True, preserve_order=True)
+    seq = ds.dataset(tempdir).to_table(use_threads=False)['a'].to_numpy()
+    prev = -1
+    for item in seq:
+        curr = int(item)
+        assert curr > prev, f"Sequence expected to be ordered: {seq}"
+        prev = curr
+
+
 def test_write_table(tempdir):
     table = pa.table([
         pa.array(range(20)), pa.array(random.random() for _ in range(20)),

Reply via email to