huberylee commented on code in PR #39393:
URL: https://github.com/apache/arrow/pull/39393#discussion_r1446260232
##########
cpp/src/arrow/io/buffered.h:
##########
@@ -163,5 +164,156 @@ class ARROW_EXPORT BufferedInputStream
std::unique_ptr<Impl> impl_;
};
+/// \class ChunkBufferedInputStream
+/// \brief An ChunkBufferedInputStream that performs buffered reads from an
+/// unbuffered InputStream, which can mitigate the overhead of many small
+/// reads in some cases.
+///
+/// When an actual io request occurs, read ranges will be coalesced if the
+/// distance between them is less than io_merge_threshold, and the actual size
+/// in one io request will be limited by the buffer_size.
+///
+/// \attention It is important to note that since the data returned by the Read
+/// interface is a reference to the Buffer, the caller must ensure that the
data
+/// returned by the Read interface is processed completely before calling the
+/// Read interface again. Otherwise, fatal errors may occur due to data in the
+/// Buffer being overwritten.
+class ARROW_EXPORT ChunkBufferedInputStream : public InputStream {
+ public:
+ static Result<std::shared_ptr<ChunkBufferedInputStream>> Create(
+ int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> impl,
+ std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t
io_merge_threshold,
+ MemoryPool* pool_ = ::arrow::default_memory_pool());
+
+ // InputStream interface
+ Status Advance(int64_t nbytes) override;
+
+ /// \brief Peek some data without advancing the read position.
+ Result<std::string_view> Peek(int64_t nbytes) override;
+
+ // Readable interface
+ Result<int64_t> Read(int64_t nbytes, void* out) override;
+
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
+
+ // FileInterface
+ Status Close() override;
+
+ Result<int64_t> Tell() const override;
+
+ bool closed() const override;
+
+ private:
+ explicit ChunkBufferedInputStream(int64_t start, int64_t length,
+ std::shared_ptr<RandomAccessFile> impl,
+ std::vector<ReadRange> read_ranges,
+ int64_t buffer_size, int32_t
io_merge_threshold,
+ std::shared_ptr<ResizableBuffer> buffer);
+
+ inline Status CheckClosed() const {
+ if (ARROW_PREDICT_TRUE(is_open_)) {
+ return Status::OK();
+ }
+
+ return Status::IOError("Operation forbidden on closed file input stream");
+ }
+
+ inline Status CheckReadRange(int64_t nbytes) {
+ // Upon reaching the end of the current read range, if there is a next read
+ // range, current_range_available_bytes_ will be set to the length of the
+ // next read range, and this will be ensured by ConsumeBuffer;
+ if (ARROW_PREDICT_TRUE(current_range_available_bytes_ >= nbytes)) {
+ return Status::OK();
+ }
+
+ if (current_range_available_bytes_ > 0 && current_range_available_bytes_ <
nbytes) {
+ if (read_ranges_[read_ranges_idx_].length > nbytes) {
+ return Status::IOError("Read length is illegal");
+ }
+
+ // At the beginning of a new read range and required bytes is more than
+ // read range length, we think it's ok because there are some tentative
+ // read requests when getting next page data.
Review Comment:
> Here is for checking the record boundary of a repeated column?
No. In ``SerializedPageReader::NextPage``, it try to read 16KB, 32KB,
64KB...., util the read data container whole page header. In an actual
scenario, the 16KB read in the first attempt may have exceeded the size of the
entire Page.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]