rok commented on code in PR #44470:
URL: https://github.com/apache/arrow/pull/44470#discussion_r2078209954


##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// This kernel delays execution for some specific scalar values,
+// which guarantees the writing phase sees out-of-order exec batches
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const auto* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    bit_util::SetBitTo(output_values, i, true);
+  }
+
+  return Status::OK();
+}
+
+// A fragment with start=0 will defer ScanBatchesAsync returning a batch 
generator
+// This guarantees a dataset of multiple fragments produces out-of-order 
batches
+class MockFragment : public Fragment {
+ public:
+  explicit MockFragment(uint32_t start, int64_t rows_per_batch, int 
num_batches,
+                        const std::shared_ptr<Schema>& schema)
+      : Fragment(compute::literal(true), schema),
+        start_(start),
+        rows_per_batch_(rows_per_batch),
+        num_batches_(num_batches) {}
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    // One fragment requires some more time than others

Review Comment:
   Do you mean:
   ```suggestion
       // First fragment requires some more time than others
   ```



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// This kernel delays execution for some specific scalar values,
+// which guarantees the writing phase sees out-of-order exec batches
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const auto* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    bit_util::SetBitTo(output_values, i, true);
+  }
+
+  return Status::OK();
+}
+
+// A fragment with start=0 will defer ScanBatchesAsync returning a batch 
generator
+// This guarantees a dataset of multiple fragments produces out-of-order 
batches

Review Comment:
   ```suggestion
   // This guarantees a dataset of multiple fragments produced out-of-order 
batches
   ```



##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,81 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
   ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
 }
 
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {
+  // Test for GH-26818
+  auto format = std::make_shared<IpcFileFormat>();
+  constexpr int kRowsPerBatch = 16;
+  constexpr int kNumBatches = 32;
+  constexpr random::SeedType kSeed = 42;
+  constexpr int kJitterMod = 4;
+  acero::RegisterTestNodes();
+
+  // Create an input table
+  std::shared_ptr<Table> table =
+      gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerBatch, 
kNumBatches);
+  auto dataset = std::make_shared<InMemoryDataset>(table);
+  auto scan_options = std::make_shared<ScanOptions>();
+  scan_options->use_threads = true;
+
+  for (bool preserve_order : {true, false}) {
+    auto scanner_builder = std::make_shared<ScannerBuilder>(dataset, 
scan_options);
+    EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+    auto exprs = scan_options->projection.call()->arguments;
+    auto names = checked_cast<const compute::MakeStructOptions*>(
+                     scan_options->projection.call()->options.get())
+                     ->field_names;
+
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    dataset::WriteNodeOptions write_options(fs_write_options_);
+    write_options.write_options.file_write_options = 
format->DefaultWriteOptions();
+    write_options.write_options.base_dir = "root";
+    write_options.write_options.partitioning =
+        std::make_shared<HivePartitioning>(schema({}));
+    write_options.write_options.basename_template = "{i}.feather";
+    write_options.write_options.filesystem = fs;
+    write_options.write_options.preserve_order = preserve_order;
+
+    // test plan of FileSystemDataset::Write with a jitter node that 
guarantees exec
+    // batches are out of order
+    acero::Declaration plan = acero::Declaration::Sequence({
+        {"scan",
+         ScanNodeOptions{dataset, scanner->options(), 
/*require_sequenced_output=*/false,
+                         /*implicit_ordering=*/true}},
+        {"filter", acero::FilterNodeOptions{scanner->options()->filter}},
+        {"project", acero::ProjectNodeOptions{std::move(exprs), 
std::move(names)}},
+        {"jitter", acero::JitterNodeOptions(kSeed, kJitterMod)},
+        {"write", write_options},
+    });
+
+    ASSERT_OK(DeclarationToStatus(plan));
+
+    // Read the file back out and verify the order

Review Comment:
   This block seems to repeat in `file_test.cc` and `write_node_test.cc`, 
perhaps we should abstract it into `test_util_internal.h`?



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +356,89 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// this kernel delays execution for some specific scalar values
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const uint32_t* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }

Review Comment:
   If probabilistic, what's the likelihood of all tasks not being out of order? 
Something like `0.001^(input.length / 16)`?



##########
cpp/src/arrow/dataset/write_node_test.cc:
##########
@@ -170,5 +176,81 @@ TEST_F(SimpleWriteNodeTest, CustomMetadata) {
   ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata()));
 }
 
+TEST_F(SimpleWriteNodeTest, SequenceOutput) {

Review Comment:
   Maybe
   ```suggestion
   TEST_F(SimpleWriteNodeTest, MultiThreadedSequenceOutputOrdering) {
   ```
   ?



##########
python/pyarrow/dataset.py:
##########
@@ -893,7 +893,12 @@ def write_dataset(data, base_dir, *, 
basename_template=None, format=None,
         ``FileFormat.make_write_options()`` function.
     use_threads : bool, default True
         Write files in parallel. If enabled, then maximum parallelism will be
-        used determined by the number of available CPU cores.
+        used determined by the number of available CPU cores.  Using multiple
+        threads may change the order of rows in the written dataset.

Review Comment:
   Perhaps redundant:
   ```suggestion
           used determined by the number of available CPU cores. Using multiple
           threads may change the order of rows in the written dataset if
           preserve_order is set to False.
   ```



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// This kernel delays execution for some specific scalar values,
+// which guarantees the writing phase sees out-of-order exec batches
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const auto* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    bit_util::SetBitTo(output_values, i, true);
+  }
+
+  return Status::OK();
+}
+
+// A fragment with start=0 will defer ScanBatchesAsync returning a batch 
generator
+// This guarantees a dataset of multiple fragments produces out-of-order 
batches
+class MockFragment : public Fragment {
+ public:
+  explicit MockFragment(uint32_t start, int64_t rows_per_batch, int 
num_batches,
+                        const std::shared_ptr<Schema>& schema)
+      : Fragment(compute::literal(true), schema),
+        start_(start),
+        rows_per_batch_(rows_per_batch),
+        num_batches_(num_batches) {}
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    // One fragment requires some more time than others
+    if (start_ == 0) {
+      std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
+    }
+
+    auto vec = gen::Gen({gen::Step(start_)})
+                   ->FailOnError()
+                   ->RecordBatches(rows_per_batch_, num_batches_);
+    auto it = MakeVectorIterator(vec);
+    return MakeBackgroundGenerator(std::move(it), 
io::default_io_context().executor());
+  }
+
+  std::string type_name() const override { return "mock"; }
+
+ protected:
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return given_physical_schema_;
+  };
+
+ private:
+  uint32_t start_;
+  int64_t rows_per_batch_;
+  int num_batches_;
+};
+
+// This dataset consists of multiple fragments with incrementing values across 
the
+// fragments
+class MockDataset : public Dataset {
+ public:
+  explicit MockDataset(const std::shared_ptr<Schema>& schema) : 
Dataset(schema) {}
+
+  MockDataset(const std::shared_ptr<Schema>& schema,
+              const compute::Expression& partition_expression)
+      : Dataset(schema, partition_expression) {}
+
+  std::string type_name() const override { return "mock"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
+    return std::make_shared<MockDataset>(std::move(schema));
+  }
+
+ protected:
+  Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) 
override {
+    FragmentVector fragments;
+    fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_));
+    fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, 
schema_));
+    return MakeVectorIterator(std::move(fragments));
+  };
+};
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {

Review Comment:
   Perhaps:
   ```suggestion
   TEST_F(TestFileSystemDataset, MultiThreadedWritePersistsOrder) {
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to