rtpsw commented on code in PR #13117:
URL: https://github.com/apache/arrow/pull/13117#discussion_r869894805


##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -460,6 +460,23 @@ class TeeNode : public compute::MapNode {
 
   const char* kind_name() const override { return "TeeNode"; }
 
+  void Finish(Status finish_st) override {
+    dataset_writer_->Finish().AddCallback([this, finish_st](const Status& 
dw_status) {
+      // Need to wait for the task group to complete regardless of dw_status
+      task_group_.End().AddCallback(
+          [this, dw_status, finish_st](const Status& tg_status) {
+            // Prefer dw_status then finish_st and then tg_status
+            if (!dw_status.ok()) {
+              finished_.MarkFinished(dw_status);
+            }
+            if (!finish_st.ok()) {
+              finished_.MarkFinished(finish_st);
+            }
+            finished_.MarkFinished(tg_status);

Review Comment:
   It would be nice if Arrow supported succinct code like 
`finished_.MarkFinished(dw_status && finish_st && tg_status);` here, where 
`status1 && status2` evaluates to `status1.ok() ? status2 : status1`, and 
perhaps there are other places in Arrow where this code pattern occurs. If you 
agree, no need to make it part of this PR; it can go on a separate issue.



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
     }
   }
 }
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, 
bool>> {
+ protected:
+  bool IsParallel() { return std::get<0>(GetParam()); }
+  bool IsSlow() { return std::get<1>(GetParam()); }
+
+  FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+  void TestDatasetWriteRoundTrip(
+      std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+          const cp::BatchesWithSchema& source_data, const 
FileSystemDatasetWriteOptions&,
+          std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+          plan_factory,
+      bool has_output) {
+    // Runs in-memory data through the plan and then scans out the written
+    // data to ensure it matches the source data
+    auto format = std::make_shared<IpcFileFormat>();
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    FileSystemDatasetWriteOptions write_options;
+    write_options.file_write_options = format->DefaultWriteOptions();
+    write_options.filesystem = fs;
+    write_options.base_dir = "root";
+    write_options.partitioning = 
std::make_shared<HivePartitioning>(schema({}));
+    write_options.basename_template = "{i}.feather";
+
+    cp::BatchesWithSchema source_data;
+    source_data.batches = {
+        cp::ExecBatchFromJSON({int32(), boolean()}, "[[null, true], [4, 
false]]"),
+        cp::ExecBatchFromJSON({int32(), boolean()},
+                              "[[5, null], [6, false], [7, false]]")};
+    source_data.schema = schema({field("i32", int32()), field("bool", 
boolean())});
+
+    AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
+    ASSERT_OK_AND_ASSIGN(auto plan, plan_factory(source_data, write_options, 
&sink_gen));
+
+    if (has_output) {
+      ASSERT_FINISHES_OK_AND_ASSIGN(auto out_batches,
+                                    cp::StartAndCollect(plan.get(), sink_gen));
+      cp::AssertExecBatchesEqual(source_data.schema, source_data.batches, 
out_batches);
+    } else {
+      ASSERT_FINISHES_OK(cp::StartAndFinish(plan.get()));
+    }
+
+    // Read written dataset and make sure it matches
+    ASSERT_OK_AND_ASSIGN(auto dataset_factory, FileSystemDatasetFactory::Make(
+                                                   fs, {"root/0.feather"}, 
format, {}));

Review Comment:
   ```suggestion
                                                      fs, {out_filename}, 
format, {}));
   ```



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
     }
   }
 }
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, 
bool>> {
+ protected:
+  bool IsParallel() { return std::get<0>(GetParam()); }
+  bool IsSlow() { return std::get<1>(GetParam()); }
+
+  FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+  void TestDatasetWriteRoundTrip(
+      std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+          const cp::BatchesWithSchema& source_data, const 
FileSystemDatasetWriteOptions&,
+          std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+          plan_factory,

Review Comment:
   Consider taking here a `cp::Declaration` (instead of a plan) factory that 
accepts the `write_options,sink_gen` arguments and creates the nodes after 
`source_data`, then make the plan (including `source_data`) within the current 
function instead of within the test cases' code. This should simplify and avoid 
some repeating code.



##########
cpp/src/arrow/dataset/file_test.cc:
##########
@@ -342,5 +347,118 @@ TEST_F(TestFileSystemDataset, WriteProjected) {
     }
   }
 }
+
+class FileSystemWriteTest : public testing::TestWithParam<std::tuple<bool, 
bool>> {
+ protected:
+  bool IsParallel() { return std::get<0>(GetParam()); }
+  bool IsSlow() { return std::get<1>(GetParam()); }
+
+  FileSystemWriteTest() { dataset::internal::Initialize(); }
+
+  void TestDatasetWriteRoundTrip(
+      std::function<Result<std::shared_ptr<cp::ExecPlan>>(
+          const cp::BatchesWithSchema& source_data, const 
FileSystemDatasetWriteOptions&,
+          std::function<Future<util::optional<cp::ExecBatch>>()>*)>
+          plan_factory,
+      bool has_output) {
+    // Runs in-memory data through the plan and then scans out the written
+    // data to ensure it matches the source data
+    auto format = std::make_shared<IpcFileFormat>();
+    auto fs = std::make_shared<fs::internal::MockFileSystem>(fs::kNoTime);
+    FileSystemDatasetWriteOptions write_options;
+    write_options.file_write_options = format->DefaultWriteOptions();
+    write_options.filesystem = fs;
+    write_options.base_dir = "root";
+    write_options.partitioning = 
std::make_shared<HivePartitioning>(schema({}));
+    write_options.basename_template = "{i}.feather";
+

Review Comment:
   ```suggestion
       const std::string out_filename = "root/0.feather";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to