pitrou commented on code in PR #13640:
URL: https://github.com/apache/arrow/pull/13640#discussion_r980087050
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
Review Comment:
Please use C++ casts, not C, so e.g. `static_cast`.
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
+ cached_data_.reserve(sector_size * 2 - 1);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + sector_size - 1) &
~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(
+ new DirectFileOutputStream(pathconf(path.c_str(), _PC_REC_XFER_ALIGN)));
+ RETURN_NOT_OK(stream->impl_->Open(path));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ int fd, int64_t sector_size) {
+ auto stream =
+ std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream(sector_size));
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0,
+ sector_size_ - cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, sector_size_));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_pos, impl_->Tell());
+ auto new_length = file_pos - sector_size_ + cached_length_;
+ // fsync(impl_->fd());
+ ftruncate(impl_->fd(), new_length);
Review Comment:
Should check for errors.
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
Review Comment:
Also can we check that `sector_size` is a power of two?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
+ cached_data_.reserve(sector_size * 2 - 1);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + sector_size - 1) &
~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(
+ new DirectFileOutputStream(pathconf(path.c_str(), _PC_REC_XFER_ALIGN)));
+ RETURN_NOT_OK(stream->impl_->Open(path));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ int fd, int64_t sector_size) {
+ auto stream =
+ std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream(sector_size));
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0,
+ sector_size_ - cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, sector_size_));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_pos, impl_->Tell());
+ auto new_length = file_pos - sector_size_ + cached_length_;
+ // fsync(impl_->fd());
+ ftruncate(impl_->fd(), new_length);
+ }
+ cached_length_ = 0;
+ return impl_->Close();
+}
+
+bool DirectFileOutputStream::closed() const { return !impl_->is_open(); }
+
+Result<int64_t> DirectFileOutputStream::Tell() const {
+ ARROW_ASSIGN_OR_RAISE(auto tell_length, impl_->Tell());
+ return tell_length + cached_length_;
+}
+
+Status DirectFileOutputStream::Write(const void* data, int64_t length) {
+ RETURN_NOT_OK(impl_->CheckClosed());
+
+ if (cached_length_ + length < sector_size_) {
+ std::memcpy(aligned_cached_data_ + cached_length_, data, length);
+ cached_length_ += length;
+ return Status::OK();
+ }
+
+ auto bytes_to_write = (cached_length_ + length) / sector_size_ *
sector_size_;
+ auto bytes_leftover = cached_length_ + length - bytes_to_write;
+ uintptr_t mask = (uintptr_t)(sector_size_ - 1);
+ std::vector<uint8_t> mem;
+ mem.reserve(bytes_to_write + sector_size_ - 1);
+ uint8_t* new_ptr = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&mem[0] + sector_size_ - 1) & ~(mask));
+ std::memcpy(new_ptr, aligned_cached_data_, cached_length_);
+ std::memcpy(new_ptr + cached_length_, data, bytes_to_write - cached_length_);
+ std::memset(aligned_cached_data_, 0, cached_length_); // this is not
required.
Review Comment:
Why do it, then? :-)
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
+ cached_data_.reserve(sector_size * 2 - 1);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + sector_size - 1) &
~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(
+ new DirectFileOutputStream(pathconf(path.c_str(), _PC_REC_XFER_ALIGN)));
+ RETURN_NOT_OK(stream->impl_->Open(path));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ int fd, int64_t sector_size) {
+ auto stream =
+ std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream(sector_size));
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0,
+ sector_size_ - cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, sector_size_));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_pos, impl_->Tell());
+ auto new_length = file_pos - sector_size_ + cached_length_;
+ // fsync(impl_->fd());
+ ftruncate(impl_->fd(), new_length);
+ }
+ cached_length_ = 0;
+ return impl_->Close();
+}
+
+bool DirectFileOutputStream::closed() const { return !impl_->is_open(); }
+
+Result<int64_t> DirectFileOutputStream::Tell() const {
+ ARROW_ASSIGN_OR_RAISE(auto tell_length, impl_->Tell());
+ return tell_length + cached_length_;
+}
+
+Status DirectFileOutputStream::Write(const void* data, int64_t length) {
+ RETURN_NOT_OK(impl_->CheckClosed());
+
+ if (cached_length_ + length < sector_size_) {
+ std::memcpy(aligned_cached_data_ + cached_length_, data, length);
+ cached_length_ += length;
+ return Status::OK();
+ }
+
+ auto bytes_to_write = (cached_length_ + length) / sector_size_ *
sector_size_;
+ auto bytes_leftover = cached_length_ + length - bytes_to_write;
+ uintptr_t mask = (uintptr_t)(sector_size_ - 1);
Review Comment:
Please only C++ casts.
##########
cpp/src/arrow/io/file.h:
##########
@@ -80,6 +80,56 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
std::unique_ptr<FileOutputStreamImpl> impl_;
};
+#if defined(__linux__)
+/// \brief An operating system file open in write-only and O_DIRECT mode, only
works on
+/// Linux.
+class ARROW_EXPORT DirectFileOutputStream : public OutputStream {
+ public:
+ ~DirectFileOutputStream() override;
+
+ /// \brief Open a local file for writing, truncating any existing file
+ /// \param[in] path with UTF8 encoding
+ ///
+ /// When opening a new file, any existing file with the indicated path is
+ /// truncated to 0 bytes, deleting any existing data
+ static Result<std::shared_ptr<DirectFileOutputStream>> Open(const
std::string& path);
+
+ /// \brief Open a file descriptor for writing. The underlying file isn't
+ /// truncated.
+ /// \param[in] fd file descriptor
+ /// \param[in] sector_size the physical disk sector size hosting this fd
+ /// \return an open FileOutputStream
+ ///
+ /// The file descriptor becomes owned by the OutputStream, and will be closed
+ /// on Close() or destruction.
+ static Result<std::shared_ptr<DirectFileOutputStream>> Open(int fd,
+ int64_t
sector_size);
+
+ // OutputStream interface
+ Status Close() override;
+ bool closed() const override;
+ Result<int64_t> Tell() const override;
+
+ // Write bytes to the stream. Thread-safe
+ Status Write(const void* data, int64_t nbytes) override;
+ /// \cond FALSE
+ using Writable::Write;
+ /// \endcond
+
+ int file_descriptor() const;
+
+ private:
+ explicit DirectFileOutputStream(int64_t sector_size);
+
+ class ARROW_NO_EXPORT DirectFileOutputStreamImpl;
+ std::unique_ptr<DirectFileOutputStreamImpl> impl_;
+ int64_t sector_size_ = 0;
+ std::vector<uint8_t> cached_data_;
+ uint8_t* aligned_cached_data_;
+ int64_t cached_length_ = 0;
Review Comment:
Let's put this inside the private impl class instead of here?
##########
cpp/src/arrow/io/file.cc:
##########
@@ -378,6 +378,106 @@ Status FileOutputStream::Write(const void* data, int64_t
length) {
int FileOutputStream::file_descriptor() const { return impl_->fd(); }
+// ----------------------------------------------------------------------
+// DirectFileOutputStream, change the Open, Write and Close methods from
FileOutputStream
+// Uses DirectIO for writes. Will only write out things in disk-sector-size
byte blocks.
+// Buffers leftover bytes in an internal data structure, which will be padded
to
+// disk-sector-size bytes and written upon call to close.
+
+#if defined(__linux__)
+class DirectFileOutputStream::DirectFileOutputStreamImpl : public OSFile {
+ public:
+ Status Open(const std::string& path) {
+ return OpenWritable(path, true, false, true /* write_only */, true);
+ }
+ Status Open(int fd) { return OpenWritable(fd); }
+};
+
+DirectFileOutputStream::DirectFileOutputStream(int64_t sector_size) {
+ sector_size_ = sector_size;
+ uintptr_t mask = (uintptr_t)(sector_size - 1);
+ cached_data_.reserve(sector_size * 2 - 1);
+ aligned_cached_data_ = reinterpret_cast<uint8_t*>(
+ reinterpret_cast<uintptr_t>(&cached_data_[0] + sector_size - 1) &
~(mask));
+
+ impl_.reset(new DirectFileOutputStreamImpl());
+}
+
+DirectFileOutputStream::~DirectFileOutputStream() {
internal::CloseFromDestructor(this); }
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ const std::string& path) {
+ auto stream = std::shared_ptr<DirectFileOutputStream>(
+ new DirectFileOutputStream(pathconf(path.c_str(), _PC_REC_XFER_ALIGN)));
+ RETURN_NOT_OK(stream->impl_->Open(path));
+ return stream;
+}
+
+Result<std::shared_ptr<DirectFileOutputStream>> DirectFileOutputStream::Open(
+ int fd, int64_t sector_size) {
+ auto stream =
+ std::shared_ptr<DirectFileOutputStream>(new
DirectFileOutputStream(sector_size));
+ RETURN_NOT_OK(stream->impl_->Open(fd));
+ return stream;
+}
+
+Status DirectFileOutputStream::Close() {
+ // some operations will call Close() on a file that is not open. In which
case don't do
+ // all this.
+ if (!closed()) {
+ // have to flush out the temprorary data, but then trim the file
+ if (cached_length_ > 0) {
+ std::memset(aligned_cached_data_ + cached_length_, 0,
+ sector_size_ - cached_length_);
+ RETURN_NOT_OK(impl_->Write(aligned_cached_data_, sector_size_));
+ }
+ ARROW_ASSIGN_OR_RAISE(auto file_pos, impl_->Tell());
+ auto new_length = file_pos - sector_size_ + cached_length_;
+ // fsync(impl_->fd());
Review Comment:
Please don't leave commented out code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]