pitrou commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r959660237
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -448,17 +448,40 @@ Result<std::shared_ptr<io::OutputStream>>
OpenOutputStreamGeneric(const std::str
return maybe_stream;
}
+Result<std::shared_ptr<io::OutputStream>> OpenDirectIOOutputStreamGeneric(
+ const std::string& path) {
+ RETURN_NOT_OK(ValidatePath(path));
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ const bool write_only = true;
+ ARROW_ASSIGN_OR_RAISE(
+ auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, true,
false, true));
+ int raw_fd = fd.Detach();
+ auto maybe_stream = io::DirectFileOutputStream::Open(raw_fd);
+ if (!maybe_stream.ok()) {
+ ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
+ }
+ return maybe_stream;
+}
+
} // namespace
Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenOutputStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>&
metadata) {
bool truncate = true;
bool append = false;
- return OpenOutputStreamGeneric(path, truncate, append);
+ if (options_.use_directio) {
+ return OpenDirectIOOutputStreamGeneric(path);
+ } else {
+ return OpenOutputStreamGeneric(path, truncate, append);
+ }
}
Result<std::shared_ptr<io::OutputStream>> LocalFileSystem::OpenAppendStream(
const std::string& path, const std::shared_ptr<const KeyValueMetadata>&
metadata) {
+ if (options_.use_directio) {
+ return Status::NotImplemented("DirectIO append stream not implemented
yet.");
+ }
Review Comment:
Since direct I/O is only an optimization, I think we should simply ignore it
here.
##########
cpp/src/arrow/filesystem/test_util.cc:
##########
@@ -883,6 +883,8 @@ void
GenericFileSystemTest::TestOpenOutputStream(FileSystem* fs) {
ASSERT_OK_AND_ASSIGN(stream, fs->OpenOutputStream("CD/ghi"));
ASSERT_OK(stream->Write("some "));
ASSERT_OK(stream->Write(Buffer::FromString("data")));
+ // this following line will cause Direct IO file system to fail
+ // since they will buffer up the nine bytes and not write them!
Review Comment:
Can you remove this comment?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
+
+ cached_data_.reserve(4096 + 4095);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path, bool append) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(path, append));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>>
DirectFileOutputStream::Open(int fd) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+#if defined(__linux__)
Review Comment:
Why only Linux? Other OSes won't get a proper close?
##########
cpp/src/arrow/util/io_util.cc:
##########
@@ -1129,6 +1130,14 @@ Result<FileDescriptor> FileOpenWritable(const
PlatformFilename& file_name,
oflag |= O_RDWR;
}
+ if (direct) {
+#if defined(__linux__)
+ oflag |= O_DIRECT; // will cause issues on Apple on Windows
+#else
+ return Status::IOError("Direct IO is only supported on Linux.");
Review Comment:
Please make this `Status::NotImplemented`.
##########
python/pyarrow/_fs.pyx:
##########
@@ -782,6 +782,11 @@ cdef class LocalFileSystem(FileSystem):
use_mmap : bool, default False
Whether open_input_stream and open_input_file should return
a mmap'ed file or a regular file.
+ use_directio : bool, default False
Review Comment:
Let's call this `use_direct_io`?
##########
cpp/src/arrow/io/file.h:
##########
@@ -80,6 +80,52 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
std::unique_ptr<FileOutputStreamImpl> impl_;
};
+class ARROW_EXPORT DirectFileOutputStream : public OutputStream {
Review Comment:
Please add a docstring here. The user should not have to wonder what this is.
##########
cpp/src/arrow/filesystem/test_util.h:
##########
@@ -238,9 +238,39 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, OpenInputFileAsync)
\
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, SpecialChars)
+#define GENERIC_FS_TEST_FUNCTIONS_WITHOUT_APPEND_MACROS(TEST_MACRO,
TEST_CLASS) \
Review Comment:
Can you remove this after you've made `OpenAppendStream` fall back
gracefully?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
+
+ cached_data_.reserve(4096 + 4095);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path, bool append) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(path, append));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>>
DirectFileOutputStream::Open(int fd) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+#if defined(__linux__)
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0, 4096 -
cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+ }
+ auto new_length = impl_->Tell().ValueOrDie() - 4096 + cached_length_;
+ fsync(impl_->fd());
Review Comment:
Why `fsync` here? This will make IO more expensive, no?
##########
cpp/src/arrow/filesystem/localfs.cc:
##########
@@ -448,17 +448,40 @@ Result<std::shared_ptr<io::OutputStream>>
OpenOutputStreamGeneric(const std::str
return maybe_stream;
}
+Result<std::shared_ptr<io::OutputStream>> OpenDirectIOOutputStreamGeneric(
+ const std::string& path) {
+ RETURN_NOT_OK(ValidatePath(path));
+ ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
+ const bool write_only = true;
+ ARROW_ASSIGN_OR_RAISE(
+ auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, true,
false, true));
+ int raw_fd = fd.Detach();
+ auto maybe_stream = io::DirectFileOutputStream::Open(raw_fd);
+ if (!maybe_stream.ok()) {
+ ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
+ }
+ return maybe_stream;
+}
Review Comment:
Instead of copy-pasting, perhaps reconcile with `OpenOutputStreamGeneric`,
one way or the other?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
+
+ cached_data_.reserve(4096 + 4095);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + 4095) & ~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path, bool append) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(path, append));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>>
DirectFileOutputStream::Open(int fd) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream());
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+#if defined(__linux__)
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0, 4096 -
cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, 4096));
+ }
+ auto new_length = impl_->Tell().ValueOrDie() - 4096 + cached_length_;
Review Comment:
Please don't use `ValueOrDie` and instead correctly propagate the error, for
example:
```suggestion
ARROW_ASSIGN_OR_RAISE(const auto file_pos, impl_->Tell());
const auto new_length = ...;
```
##########
cpp/src/arrow/util/io_util.cc:
##########
@@ -1129,6 +1130,14 @@ Result<FileDescriptor> FileOpenWritable(const
PlatformFilename& file_name,
oflag |= O_RDWR;
}
+ if (direct) {
+#if defined(__linux__)
+ oflag |= O_DIRECT; // will cause issues on Apple on Windows
Review Comment:
Did you mean "Apple and Windows" perhaps?
##########
cpp/src/arrow/io/file.h:
##########
@@ -80,6 +80,52 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
std::unique_ptr<FileOutputStreamImpl> impl_;
};
+class ARROW_EXPORT DirectFileOutputStream : public OutputStream {
+ public:
+ ~DirectFileOutputStream() override;
+
+ /// \brief Open a local file for writing, truncating any existing file
+ /// \param[in] path with UTF8 encoding
+ /// \param[in] append append to existing file, otherwise truncate to 0 bytes
+ /// \return an open FileOutputStream
+ ///
+ /// When opening a new file, any existing file with the indicated path is
+ /// truncated to 0 bytes, deleting any existing data
+ static Result<std::shared_ptr<DirectFileOutputStream>> Open(const
std::string& path,
Review Comment:
I think the two factory functions `Open` should return a
`NotImplementedError` if the OS is not supported.
Also, an `append` flag should probably not be accepted if it is not
supported.
##########
cpp/src/arrow/filesystem/localfs.h:
##########
@@ -37,6 +37,7 @@ struct ARROW_EXPORT LocalFileSystemOptions {
/// Whether OpenInputStream and OpenInputFile return a mmap'ed file,
/// or a regular one.
bool use_mmap = false;
+ bool use_directio = false;
Review Comment:
```suggestion
/// Whether OpenOutputStream opens the file with O_DIRECT,
/// if the operating system supports it.
/// This flag can have non-trivial performance implications.
bool use_direct_io = false;
```
##########
python/pyarrow/_fs.pyx:
##########
@@ -782,6 +782,11 @@ cdef class LocalFileSystem(FileSystem):
use_mmap : bool, default False
Whether open_input_stream and open_input_file should return
a mmap'ed file or a regular file.
+ use_directio : bool, default False
+ Whether open_output_stream will return an output stream with direct io.
+ Does not support open_append_stream.
+ Will only work on Linux. Slightly different write semantics.
Review Comment:
Why are the semantics slightly different?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,101 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in 4096 byte blocks.
Buffers
+// leftover bytes in an internal data structure, which will be padded to 4096
bytes and
+// flushed upon call to close.
+
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path, bool append) {
+ const bool truncate = !append;
+ return OpenWritable(path, truncate, append, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream() {
+ uintptr_t mask = (uintptr_t)(4095);
Review Comment:
Please let's not hard-code 4096, use `GetPageSize()` instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]