westonpace commented on code in PR #13117:
URL: https://github.com/apache/arrow/pull/13117#discussion_r870745521
##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -460,6 +460,23 @@ class TeeNode : public compute::MapNode {
const char* kind_name() const override { return "TeeNode"; }
+ void Finish(Status finish_st) override {
+ dataset_writer_->Finish().AddCallback([this, finish_st](const Status&
dw_status) {
+ // Need to wait for the task group to complete regardless of dw_status
+ task_group_.End().AddCallback(
+ [this, dw_status, finish_st](const Status& tg_status) {
+ // Prefer dw_status then finish_st and then tg_status
+ if (!dw_status.ok()) {
+ finished_.MarkFinished(dw_status);
+ }
+ if (!finish_st.ok()) {
+ finished_.MarkFinished(finish_st);
+ }
+ finished_.MarkFinished(tg_status);
Review Comment:
Good idea. We do support `dw_status & finish_st & tg_status` so I changed
to that.
##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
}
}
}
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool,
bool>> {
+ protected:
+ bool IsParallel() { return std::get<0>(GetParam()); }
+ bool IsSlow() { return std::get<1>(GetParam()); }
+
+ FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+ void TestDatasetWriteRoundTrip(
+ std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+ const cp::BatchesWithSchema& source_data, const
FileSystemDatasetWriteOptions&,
+ std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+ plan_factory,
+ bool has_output) {
+ // Runs in-memory data through the plan and then scans out the written
+ // data to ensure it matches the source data
+ auto format = std::make_shared<IpcFileFormat>();
+ auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+ FileSystemDatasetWriteOptions write_options;
+ write_options.file_write_options = format->DefaultWriteOptions();
+ write_options.filesystem = fs;
+ write_options.base_dir = "root";
+ write_options.partitioning =
std::make_shared<HivePartitioning>(schema({}));
+ write_options.basename_template = "{i}.feather";
+
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]