marsupialtail commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r960971606


##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t 
length) {
 
 int FileOutputStream::file_descriptor() const { return impl_->fd(); }
 
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from 
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks. 
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096 
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+  Status Open(const std::string& path, bool append) {
+    const bool truncate = !append;
+    return OpenWritable(path, truncate, append, true /* write_only */, true);
+  }
+  Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+  uintptr_t mask = (uintptr_t)(4095);
+
+  cached_data_.reserve(4096 + 4095);
+  aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+      reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+  impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() { 
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+    const std::string& path, bool append) {
+  auto stream = std::shared_ptr<DirectFileOutputStream>(new 
DirectFileOutputStream());
+  RETURN_NOT_OK(stream->impl_->Open(path, append));
+  return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> 
DirectFileOutputStream::Open(int fd) {
+  auto stream = std::shared_ptr<DirectFileOutputStream>(new 
DirectFileOutputStream());
+  RETURN_NOT_OK(stream->impl_->Open(fd));
+  return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+  // some operations will call Close() on a file that is not open. In which 
case don't do
+  // all this.
+#if defined(__linux__)
+  if (!closed()) {
+    // have to flush out the temprorary data, but then trim the file
+    if (cached_length_ > 0) {
+      std::memset(aligned_cached_data_ + cached_length_, 0, 4096 - 
cached_length_);
+      RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+    }
+    auto new_length = impl_->Tell().ValueOrDie() - 4096 + cached_length_;
+    fsync(impl_->fd());

Review Comment:
   Yes this is probably legacy concern. Back when I still had the file opened 
in O_DSYNC, not syncing at the end felt wrong. Now since data can be in limbo 
in SSD cache probably noone cares if it's actually synced anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to