westonpace commented on a change in pull request #12099:
URL: https://github.com/apache/arrow/pull/12099#discussion_r781481400
##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -334,7 +334,10 @@ Status CsvFileWriter::Write(const
std::shared_ptr<RecordBatch>& batch) {
return batch_writer_->WriteRecordBatch(*batch);
}
-Status CsvFileWriter::FinishInternal() { return batch_writer_->Close(); }
+Future<> CsvFileWriter::FinishInternal() {
+ return
DeferNotOk(destination_locator_.filesystem->io_context().executor()->Submit(
+ [this]() { return batch_writer_->Close(); }));
+}
Review comment:
The CSV writer's `Close` method is a no-op so I think it would be safe
to just return a finished future here.
```suggestion
Future<> CsvFileWriter::FinishInternal() {
batch_writer_->Close();
return Future<>::MakeFinished();
}
```
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1293,14 +1335,21 @@ class ObjectOutputStream final : public
io::OutputStream {
}
Status Flush() override {
+ if (closed_) {
+ return Status::Invalid("Operation on closed stream");
+ }
Review comment:
Is this check needed? If `closed_` is true then `FlushAsync` is just
going to immediately return a failed future anyways.
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1243,6 +1243,48 @@ class ObjectOutputStream final : public io::OutputStream
{
return Status::OK();
}
+ Future<> CloseAsync() override {
Review comment:
There's a lot of overlap between this method and `Close`. Can we create
some helper methods so we have something like (pseudo-code)...
```
Future<> CloseAsync() override {
PrepareClose();
FlushAsync().Then(DoClose);
}
Status Close() override {
PrepareClose();
Flush();
DoClose();
}
```
Or you could just change `Close` to be `return CloseAsync().status()`
##########
File path: cpp/src/arrow/filesystem/s3fs.cc
##########
@@ -1445,10 +1496,10 @@ class ObjectOutputStream final : public
io::OutputStream {
// in the completion handler.
struct UploadState {
std::mutex mutex;
- std::condition_variable cv;
Aws::Vector<S3Model::CompletedPart> completed_parts;
int64_t parts_in_progress = 0;
Status status;
+ Future<> completed = Future<>::MakeFinished(Status::OK());
Review comment:
Nit: `completed` is a little bit misleading here. A user could write,
flush, and continue writing. I'm not sure what a better name would be but
maybe something like `writes_in_progress`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]