marsupialtail commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r959834366
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
+
+ cached_data_.reserve(4096 + 4095);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path, bool append) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(path, append));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>>
DirectFileOutputStream::Open(int fd) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+#if defined(__linux__)
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0, 4096 -
cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+ }
+ auto new_length = impl_->Tell().ValueOrDie() - 4096 + cached_length_;
+ fsync(impl_->fd());
Review Comment:
This is okay because this is only done at the end, and you need to sync the
file when you are going to close it, particularly since I am going to fruncate
if afterwards to the right length. I think it's good practice to flush all the
writes before you close a file anyways.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]