westonpace commented on code in PR #13117:
URL: https://github.com/apache/arrow/pull/13117#discussion_r870745687
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
}
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool,
bool>> {
+ protected:
+ bool IsParallel() { return std::get<0>(GetParam()); }
+ bool IsSlow() { return std::get<1>(GetParam()); }
+
+ FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+ void TestDatasetWriteRoundTrip(
+ std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+ const cp::BatchesWithSchema& source_data, const
FileSystemDatasetWriteOptions&,
+ std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+ plan_factory,
+ bool has_output) {
+ // Runs in-memory data through the plan and then scans out the written
+ // data to ensure it matches the source data
+ auto format = std::make_shared<IpcFileFormat>();
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.filesystem = fs;
+ write_options.base_dir = "root";
+ write_options.partitioning =
std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
+ cp::BatchesWithSchema source_data;
+ source_data.batches = {
+ cp::ExecBatchFromJSON({int32(), boolean()}, "[[null, true], [4,
false]]"),
+ cp::ExecBatchFromJSON({int32(), boolean()},
+ "[[5, null], [6, false], [7, false]]")};
+ source_data.schema = schema({field("i32", int32()), field("bool",
boolean())});
+
+ AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+ ASSERT_OK_AND_ASSIGN(auto plan, plan_factory(source_data, write_options,
&sink_gen));
+
+ if (has_output) {
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches,
+ cp::StartAndCollect(plan.get(), sink_gen));
+ cp::AssertExecBatchesEqual(source_data.schema, source_data.batches,
out_batches);
+ } else {
+ ASSERT_FINISHES_OK(cp::StartAndFinish(plan.get()));
+ }
+
+ // Read written dataset and make sure it matches
+ ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+ fs, {"root/0.feather"},
format, {}));
Review Comment:
Done.
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
}
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool,
bool>> {
+ protected:
+ bool IsParallel() { return std::get<0>(GetParam()); }
+ bool IsSlow() { return std::get<1>(GetParam()); }
+
+ FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+ void TestDatasetWriteRoundTrip(
+ std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+ const cp::BatchesWithSchema& source_data, const
FileSystemDatasetWriteOptions&,
+ std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+ plan_factory,
Review Comment:
Thanks, this is much cleaner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]