westonpace commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r957865732
##########
cpp/src/arrow/util/io_util.cc:
##########
@@ -1129,6 +1130,14 @@ Result<FileDescriptor> FileOpenWritable(const
PlatformFilename& file_name,
oflag |= O_RDWR;
}
+ if (direct) {
+#if defined(__linux__)
+ oflag |= O_DIRECT; // will cause issues on Apple on Windows
+#else
+ return Status::IOError("Direct IO does not support Mac OS.");
Review Comment:
```suggestion
return Status::IOError("Direct IO is only supported on Linux.");
```
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +379,99 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
+
+ cached_data_.reserve(4096 + 4095);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path, bool append) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(path, append));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>>
DirectFileOutputStream::Open(int fd) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+#if defined(__linux__)
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length > 0) {
+ std::memset(aligned_cached_data_ + cached_length, 0, 4096 -
cached_length);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+ }
+ auto new_length = Tell().ValueOrDie() - 4096 + cached_length;
+ // this is just to help compile. If you are not on Linux you would have
failed out way
+ // before this point.
Review Comment:
I'm not sure I follow this comment
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -883,7 +883,9 @@ void
GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream("CD/ghi"));
ASSERT_OK(stream->Write("some "));
ASSERT_OK(stream->Write(Buffer::FromString("data")));
- ASSERT_OK_AND_EQ(9, stream->Tell());
+ // this following line will cause Direct IO file system to fail
+ // since they will buffer up the nine bytes and not write them!
+ // ASSERT_OK_AND_EQ(9, stream->Tell());
Review Comment:
You could presumably overwrite the implementation of `Tell` to include the
buffered bytes right? Thought I'm not sure if that is important.
##########
cpp/src/arrow/io/file.h:
##########
@@ -80,6 +80,52 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
std::unique_ptr<FileOutputStreamImpl> impl_;
};
+class ARROW_EXPORT DirectFileOutputStream : public OutputStream {
+ public:
+ ~DirectFileOutputStream() override;
+
+ /// \brief Open a local file for writing, truncating any existing file
+ /// \param[in] path with UTF8 encoding
+ /// \param[in] append append to existing file, otherwise truncate to 0 bytes
+ /// \return an open FileOutputStream
+ ///
+ /// When opening a new file, any existing file with the indicated path is
+ /// truncated to 0 bytes, deleting any existing data
+ static Result<std::shared_ptr<DirectFileOutputStream>> Open(const
std::string& path,
+ bool append =
false);
+
+ /// \brief Open a file descriptor for writing. The underlying file isn't
+ /// truncated.
+ /// \param[in] fd file descriptor
+ /// \return an open FileOutputStream
+ ///
+ /// The file descriptor becomes owned by the OutputStream, and will be closed
+ /// on Close() or destruction.
+ static Result<std::shared_ptr<DirectFileOutputStream>> Open(int fd);
+
+ // OutputStream interface
+ Status Close() override;
+ bool closed() const override;
+ Result<int64_t> Tell() const override;
+
+ // Write bytes to the stream. Thread-safe
+ Status Write(const void* data, int64_t nbytes) override;
+ /// \cond FALSE
+ using Writable::Write;
+ /// \endcond
+
+ int file_descriptor() const;
+
+ private:
+ DirectFileOutputStream();
+
+ class ARROW_NO_EXPORT DirectFileOutputStreamImpl;
Review Comment:
I had no idea we had `ARROW_NO_EXPORT`. I wonder if we could've used this
to avoid our other PIMPL problems like what we ran into with Cuda recently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]