pitrou commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r959848140


##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t 
length) {
 
 int FileOutputStream::file_descriptor() const { return impl_->fd(); }
 
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from 
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks. 
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096 
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+  Status Open(const std::string& path, bool append) {
+    const bool truncate = !append;
+    return OpenWritable(path, truncate, append, true /* write_only */, true);
+  }
+  Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+  uintptr_t mask = (uintptr_t)(4095);
+
+  cached_data_.reserve(4096 + 4095);
+  aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+      reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+  impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() { 
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+    const std::string& path, bool append) {
+  auto stream = std::shared_ptr<DirectFileOutputStream>(new 
DirectFileOutputStream());
+  RETURN_NOT_OK(stream->impl_->Open(path, append));
+  return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> 
DirectFileOutputStream::Open(int fd) {
+  auto stream = std::shared_ptr<DirectFileOutputStream>(new 
DirectFileOutputStream());
+  RETURN_NOT_OK(stream->impl_->Open(fd));
+  return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+  // some operations will call Close() on a file that is not open. In which 
case don't do
+  // all this.
+#if defined(__linux__)
+  if (!closed()) {
+    // have to flush out the temprorary data, but then trim the file
+    if (cached_length_ > 0) {
+      std::memset(aligned_cached_data_ + cached_length_, 0, 4096 - 
cached_length_);
+      RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+    }
+    auto new_length = impl_->Tell().ValueOrDie() - 4096 + cached_length_;
+    fsync(impl_->fd());

Review Comment:
   Hmm, I'm curious, why do you need it before truncating and closing? It 
doesn't sound right. You should let the OS decide when to commit the writes to 
disk (or elsewhere).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to