pitrou commented on code in PR #39807:
URL: https://github.com/apache/arrow/pull/39807#discussion_r1469934973
##########
cpp/src/arrow/io/compressed.cc:
##########
@@ -397,10 +415,11 @@ class CompressedInputStream::Impl {
std::shared_ptr<InputStream> raw_;
bool is_open_;
std::shared_ptr<Decompressor> decompressor_;
- std::shared_ptr<Buffer> compressed_;
+ std::shared_ptr<ResizableBuffer> compressed_;
// Position in compressed buffer
int64_t compressed_pos_;
std::shared_ptr<ResizableBuffer> decompressed_;
+ std::shared_ptr<ResizableBuffer> decompressed_impl_;
Review Comment:
Why are there two buffers suddently? What is the difference between the two?
##########
cpp/src/arrow/io/compressed.cc:
##########
@@ -261,21 +262,39 @@ class CompressedInputStream::Impl {
}
}
- bool closed() { return !is_open_; }
+ bool closed() const { return !is_open_; }
Result<int64_t> Tell() const { return total_pos_; }
// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() -
compressed_pos_ : 0;
if (compressed_avail == 0) {
+ // Ensure compressed_ buffer is allocated with kChunkSize.
+ if (compressed_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize,
pool_));
+ } else {
+ RETURN_NOT_OK(compressed_->Resize(kChunkSize,
/*shrink_to_fit=*/false));
+ }
// No compressed data available, read a full chunk
- ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+ ARROW_ASSIGN_OR_RAISE(int64_t read_size,
+ raw_->Read(kChunkSize,
compressed_->mutable_data_as<void>()));
Review Comment:
This will do a spurious copy if the raw stream is able to zero-copy the data
(for example a mmap'ed file or a BufferReader).
##########
cpp/src/arrow/io/compressed.cc:
##########
@@ -261,21 +262,39 @@ class CompressedInputStream::Impl {
}
}
- bool closed() { return !is_open_; }
+ bool closed() const { return !is_open_; }
Result<int64_t> Tell() const { return total_pos_; }
// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() -
compressed_pos_ : 0;
if (compressed_avail == 0) {
+ // Ensure compressed_ buffer is allocated with kChunkSize.
+ if (compressed_ == nullptr) {
+ ARROW_ASSIGN_OR_RAISE(compressed_, AllocateResizableBuffer(kChunkSize,
pool_));
+ } else {
+ RETURN_NOT_OK(compressed_->Resize(kChunkSize,
/*shrink_to_fit=*/false));
+ }
// No compressed data available, read a full chunk
- ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
+ ARROW_ASSIGN_OR_RAISE(int64_t read_size,
+ raw_->Read(kChunkSize,
compressed_->mutable_data_as<void>()));
Review Comment:
It's true that with a 64kiB chunk size, this might not be a problem, though.
##########
cpp/src/arrow/io/compressed.h:
##########
@@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public
OutputStream {
~CompressedOutputStream() override;
/// \brief Create a compressed output stream wrapping the given output
stream.
+ ///
+ /// The codec must be able to streaming compress the data. Some codecs,
+ /// like snapppy, is not able to do so.
Review Comment:
```suggestion
/// The codec must be capaable of streaming compression. Some codecs,
/// like Snappy, are not able to do so.
```
##########
cpp/src/arrow/io/compressed.h:
##########
@@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream
~CompressedInputStream() override;
/// \brief Create a compressed input stream wrapping the given input stream.
+ ///
+ /// The codec must be able to streaming decompress the data. Some codecs,
Review Comment:
Same suggestion here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]