mapleFU commented on code in PR #39393:
URL: https://github.com/apache/arrow/pull/39393#discussion_r1439252150
##########
cpp/src/arrow/io/buffered.cc:
##########
@@ -497,5 +497,141 @@ Future<std::shared_ptr<const KeyValueMetadata>>
BufferedInputStream::ReadMetadat
return impl_->raw()->ReadMetadataAsync(io_context);
}
+// ----------------------------------------------------------------------
+// ChunkBufferedInputStream implementation
+
+ChunkBufferedInputStream::ChunkBufferedInputStream(
+ int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> raw,
+ std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t
io_merge_threshold,
+ std::shared_ptr<ResizableBuffer> buffer)
+ : is_open_(true),
+ raw_pos_(start),
+ raw_end_(start + length),
+ raw_(std::move(raw)),
+ read_ranges_(std::move(read_ranges)),
+ current_range_available_bytes_(read_ranges_[read_ranges_idx_].length),
+ buffer_size_(buffer_size),
+ io_merge_threshold_(io_merge_threshold),
+ buffer_(std::move(buffer)) {}
+
+Result<std::shared_ptr<ChunkBufferedInputStream>>
ChunkBufferedInputStream::Create(
+ int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> impl,
+ std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t
io_merge_threshold,
+ MemoryPool* pool) {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, ::arrow::AllocateResizableBuffer(0,
pool));
+ return std::shared_ptr<ChunkBufferedInputStream>(
+ new ChunkBufferedInputStream(start, length, std::move(impl),
std::move(read_ranges),
+ buffer_size, io_merge_threshold,
std::move(buffer)));
+}
+
+Status ChunkBufferedInputStream::Advance(int64_t nbytes) {
+ RETURN_NOT_OK(BufferIfNeeded(nbytes));
+ auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+ ConsumeBuffer(bytes_read);
+ return Status::OK();
+}
+
+Result<std::string_view> ChunkBufferedInputStream::Peek(int64_t nbytes) {
+ RETURN_NOT_OK(BufferIfNeeded(nbytes));
+ auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+ return std::string_view(reinterpret_cast<const char*>(buffer_->data() +
buffer_pos_),
+ bytes_read);
+}
+
+Result<int64_t> ChunkBufferedInputStream::Read(int64_t nbytes, void* out) {
+ RETURN_NOT_OK(BufferIfNeeded(nbytes));
+ auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+ std::memcpy(out, buffer_->data() + buffer_pos_, bytes_read);
+ ConsumeBuffer(bytes_read);
+ return nbytes;
+}
+
+Result<std::shared_ptr<Buffer>> ChunkBufferedInputStream::Read(int64_t nbytes)
{
+ RETURN_NOT_OK(BufferIfNeeded(nbytes));
+ auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+ auto buffer = SliceBuffer(buffer_, buffer_pos_, bytes_read);
+ ConsumeBuffer(bytes_read);
+ return std::move(buffer);
+}
+
+Status ChunkBufferedInputStream::Close() {
+ is_open_ = false;
+ return Status::OK();
+}
+
+Result<int64_t> ChunkBufferedInputStream::Tell() const {
+ RETURN_NOT_OK(CheckClosed());
+ return raw_pos_;
+}
+
+bool ChunkBufferedInputStream::closed() const { return !is_open_; }
+
+Status ChunkBufferedInputStream::BufferIfNeeded(int64_t nbytes) {
+ RETURN_NOT_OK(CheckClosed());
+ RETURN_NOT_OK(CheckReadRange(nbytes));
+
+ if (ARROW_PREDICT_TRUE(bytes_buffered_ > 0)) {
+ return Status::OK();
+ }
+
+ if (read_ranges_idx_ == read_ranges_.size()) {
+ ARROW_DCHECK(current_range_available_bytes_ == 0);
+ return Status::OK();
+ }
+
+ read_gaps_.clear();
Review Comment:
So this is triggered once?
##########
cpp/src/parquet/arrow/test_util.h:
##########
@@ -129,11 +138,16 @@ template <typename ArrowType>
::arrow::enable_if_fixed_size_binary<ArrowType, Status> NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
- // set byte_width to the length of "fixed": 5
+ // set byte_width to the length of "fixed": 10
// todo: find a way to generate test data with more diversity.
- BuilderType builder(::arrow::fixed_size_binary(5));
+ const int byte_width = 10;
Review Comment:
Hmm whats the purpose here?
##########
cpp/src/arrow/io/buffered.h:
##########
@@ -163,5 +164,156 @@ class ARROW_EXPORT BufferedInputStream
std::unique_ptr<Impl> impl_;
};
+/// \class ChunkBufferedInputStream
+/// \brief An ChunkBufferedInputStream that performs buffered reads from an
+/// unbuffered InputStream, which can mitigate the overhead of many small
+/// reads in some cases.
+///
+/// When an actual io request occurs, read ranges will be coalesced if the
+/// distance between them is less than io_merge_threshold, and the actual size
+/// in one io request will be limited by the buffer_size.
+///
+/// \attention It is important to note that since the data returned by the Read
+/// interface is a reference to the Buffer, the caller must ensure that the
data
+/// returned by the Read interface is processed completely before calling the
+/// Read interface again. Otherwise, fatal errors may occur due to data in the
+/// Buffer being overwritten.
+class ARROW_EXPORT ChunkBufferedInputStream : public InputStream {
Review Comment:
What's the exactly difference between this and CachedFileInputStream? Can we
just implement this on it?
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -980,6 +1006,12 @@ class ColumnReaderImplBase {
// plain-encoded data.
std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_;
+ // Skip info for current page
+ PageSkipInfo* skip_info_{nullptr};
Review Comment:
If the page is v1 without page-index, what would this being?
##########
cpp/src/parquet/column_reader.h:
##########
@@ -118,6 +120,110 @@ struct CryptoContext {
std::shared_ptr<Decryptor> data_decryptor;
};
+struct PageSkipInfo {
+ PageSkipInfo() = default;
+
+ const RowRanges::Range& Range() { return ranges_[index_]; }
+
+ int64_t SkipRowNum() const { return skip_row_nums_[index_]; }
+
+ int64_t LastRowIndex() const { return last_row_indices_[index_]; }
+
+ bool HasNext() const {
+ if (index_ != ranges_.size() - 1) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ ::arrow::Status Next() {
+ ARROW_RETURN_IF(!HasNext(), ::arrow::Status::Invalid("No more range to
read"));
+ index_++;
+ return ::arrow::Status::OK();
+ }
+
+ int64_t EndRowIndex() const { return last_row_indices_.back(); }
+
+ bool operator==(const PageSkipInfo& other) const {
+ if (ranges_ != other.ranges_) {
+ return false;
+ }
+ if (last_row_indices_ != other.last_row_indices_) {
+ return false;
+ }
+ if (skip_row_nums_ != other.skip_row_nums_) {
+ return false;
+ }
+
+ return true;
+ }
+
+ bool operator!=(const PageSkipInfo& other) const { return !(*this == other);
}
+
+ //
+ // | <--------------------------- column chunk
-------------------------------> |
+ // | <-------------------- page N -----------------------> |
+ // first_row_idx last_row_idx
+ // |-- ... --|-------------------------------------------------------|---
... ---|
+ // |---- range0 ----| |---- range1 ----|
+ // |--skip0--| |--skip1--|
+ // |------last_row_index0-----|
+ // |-------------------last_row_index1-------------------|
+ //
+
+ // Row ranges for this page, start counting from within column chunk
+ std::vector<RowRanges::Range> ranges_;
+
+ // The num of rows to skip before reading each row range
+ std::vector<int64_t> skip_row_nums_;
+
+ // The last row index for echo row range, start counting from within the page
Review Comment:
What does "echo row range" mean, is it "each row range"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]