rok commented on code in PR #44470:
URL: https://github.com/apache/arrow/pull/44470#discussion_r2081533974


##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -353,6 +361,153 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
   }
 }
 
+// This kernel delays execution for some specific scalar values,
+// which guarantees the writing phase sees out-of-order exec batches
+Status delay(compute::KernelContext* ctx, const compute::ExecSpan& batch,
+             compute::ExecResult* out) {
+  const ArraySpan& input = batch[0].array;
+  const auto* input_values = input.GetValues<uint32_t>(1);
+  uint8_t* output_values = out->array_span()->buffers[1].data;
+
+  // Boolean data is stored in 1 bit per value
+  for (int64_t i = 0; i < input.length; ++i) {
+    if (input_values[i] % 16 == 0) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    bit_util::SetBitTo(output_values, i, true);
+  }
+
+  return Status::OK();
+}
+
+// A fragment with start=0 will defer ScanBatchesAsync returning a batch 
generator
+// This guarantees a dataset of multiple fragments produces out-of-order 
batches
+class MockFragment : public Fragment {
+ public:
+  explicit MockFragment(uint32_t start, int64_t rows_per_batch, int 
num_batches,
+                        const std::shared_ptr<Schema>& schema)
+      : Fragment(compute::literal(true), schema),
+        start_(start),
+        rows_per_batch_(rows_per_batch),
+        num_batches_(num_batches) {}
+
+  Result<RecordBatchGenerator> ScanBatchesAsync(
+      const std::shared_ptr<ScanOptions>& options) override {
+    // One fragment requires some more time than others
+    if (start_ == 0) {
+      std::this_thread::sleep_for(std::chrono::duration<double>(0.1));
+    }
+
+    auto vec = gen::Gen({gen::Step(start_)})
+                   ->FailOnError()
+                   ->RecordBatches(rows_per_batch_, num_batches_);
+    auto it = MakeVectorIterator(vec);
+    return MakeBackgroundGenerator(std::move(it), 
io::default_io_context().executor());
+  }
+
+  std::string type_name() const override { return "mock"; }
+
+ protected:
+  Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
+    return given_physical_schema_;
+  };
+
+ private:
+  uint32_t start_;
+  int64_t rows_per_batch_;
+  int num_batches_;
+};
+
+// This dataset consists of multiple fragments with incrementing values across 
the
+// fragments
+class MockDataset : public Dataset {
+ public:
+  explicit MockDataset(const std::shared_ptr<Schema>& schema) : 
Dataset(schema) {}
+
+  MockDataset(const std::shared_ptr<Schema>& schema,
+              const compute::Expression& partition_expression)
+      : Dataset(schema, partition_expression) {}
+
+  std::string type_name() const override { return "mock"; }
+  Result<std::shared_ptr<Dataset>> ReplaceSchema(
+      std::shared_ptr<Schema> schema) const override {
+    RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
+    return std::make_shared<MockDataset>(std::move(schema));
+  }
+
+ protected:
+  Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) 
override {
+    FragmentVector fragments;
+    fragments.push_back(std::make_shared<MockFragment>(0, 2, 1024, schema_));
+    fragments.push_back(std::make_shared<MockFragment>(2 * 1024, 2, 1024, 
schema_));
+    return MakeVectorIterator(std::move(fragments));
+  };
+};
+
+TEST_F(TestFileSystemDataset, WritePersistOrder) {
+  // Test for GH-26818
+  auto format = std::make_shared<IpcFileFormat>();
+  FileSystemDatasetWriteOptions write_options;
+  write_options.file_write_options = format->DefaultWriteOptions();
+  write_options.base_dir = "root";
+  write_options.partitioning = std::make_shared<HivePartitioning>(schema({}));
+  write_options.basename_template = "{i}.feather";
+
+  // The Mock dataset delays emitting the first fragment, which test sequenced 
output of
+  // scan node
+  auto dataset = std::make_shared<MockDataset>(schema({field("f0", int32())}));
+
+  // The delay scalar function delays some batches of all fragments, which 
tests implicit
+  // ordering
+  auto delay_func = std::make_shared<compute::ScalarFunction>("delay", 
compute::Arity(1),
+                                                              
compute::FunctionDoc());
+  compute::ScalarKernel delay_kernel;
+  delay_kernel.exec = delay;
+  delay_kernel.signature = compute::KernelSignature::Make({int32()}, 
boolean());
+  ASSERT_OK(delay_func->AddKernel(delay_kernel));
+  ASSERT_OK(compute::GetFunctionRegistry()->AddFunction(delay_func));
+
+  for (bool preserve_order : {true, false}) {
+    ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
+    ASSERT_OK(scanner_builder->UseThreads(true));
+    ASSERT_OK(
+        scanner_builder->Filter(compute::call("delay", 
{compute::field_ref("f0")})));
+    ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
+
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    write_options.filesystem = fs;
+    write_options.preserve_order = preserve_order;
+
+    ASSERT_OK(FileSystemDataset::Write(write_options, scanner));
+
+    // Read the file back out and verify the order
+    ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+                                                   fs, {"root/0.feather"}, 
format, {}));
+    ASSERT_OK_AND_ASSIGN(auto written_dataset, 
dataset_factory->Finish(FinishOptions{}));
+    ASSERT_OK_AND_ASSIGN(scanner_builder, written_dataset->NewScan());
+    ASSERT_OK(scanner_builder->UseThreads(false));
+    ASSERT_OK_AND_ASSIGN(scanner, scanner_builder->Finish());
+    ASSERT_OK_AND_ASSIGN(auto actual, scanner->ToTable());
+    TableBatchReader reader(*actual);
+    std::shared_ptr<RecordBatch> batch;
+    ASSERT_OK(reader.ReadNext(&batch));
+    int32_t prev = -1;
+    auto out_of_order = false;
+    while (batch != nullptr) {
+      const auto* values = batch->column(0)->data()->GetValues<int32_t>(1);
+      for (int row = 0; row < batch->num_rows(); ++row) {
+        int32_t value = values[row];
+        if (value <= prev) {
+          out_of_order = true;
+        }
+        prev = value;
+      }
+      ASSERT_OK(reader.ReadNext(&batch));
+    }
+    ASSERT_EQ(!out_of_order, preserve_order);

Review Comment:
   If I understand @EnricoMi correctly - it doesn't hurt to actually check ooo 
when `preserve_order == false` as it guarantees our generated test data is ooo 
before sorting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to