huberylee commented on code in PR #39393:
URL: https://github.com/apache/arrow/pull/39393#discussion_r1446277356


##########
cpp/src/arrow/io/buffered.h:
##########
@@ -163,5 +164,156 @@ class ARROW_EXPORT BufferedInputStream
   std::unique_ptr<Impl> impl_;
 };
 
+/// \class ChunkBufferedInputStream
+/// \brief An ChunkBufferedInputStream that performs buffered reads from an
+/// unbuffered InputStream, which can mitigate the overhead of many small
+/// reads in some cases.
+///
+/// When an actual io request occurs, read ranges will be coalesced if the
+/// distance between them is less than io_merge_threshold, and the actual size
+/// in one io request will be limited by the buffer_size.
+///
+/// \attention It is important to note that since the data returned by the Read
+/// interface is a reference to the Buffer, the caller must ensure that the 
data
+/// returned by the Read interface is processed completely before calling the
+/// Read interface again. Otherwise, fatal errors may occur due to data in the
+/// Buffer being overwritten.
+class ARROW_EXPORT ChunkBufferedInputStream : public InputStream {
+ public:
+  static Result<std::shared_ptr<ChunkBufferedInputStream>> Create(
+      int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> impl,
+      std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t 
io_merge_threshold,
+      MemoryPool* pool_ = ::arrow::default_memory_pool());
+
+  // InputStream interface
+  Status Advance(int64_t nbytes) override;
+
+  /// \brief Peek some data without advancing the read position.
+  Result<std::string_view> Peek(int64_t nbytes) override;
+
+  // Readable interface
+  Result<int64_t> Read(int64_t nbytes, void* out) override;
+
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
+
+  // FileInterface
+  Status Close() override;
+
+  Result<int64_t> Tell() const override;
+
+  bool closed() const override;
+
+ private:
+  explicit ChunkBufferedInputStream(int64_t start, int64_t length,
+                                    std::shared_ptr<RandomAccessFile> impl,
+                                    std::vector<ReadRange> read_ranges,
+                                    int64_t buffer_size, int32_t 
io_merge_threshold,
+                                    std::shared_ptr<ResizableBuffer> buffer);
+
+  inline Status CheckClosed() const {
+    if (ARROW_PREDICT_TRUE(is_open_)) {
+      return Status::OK();
+    }
+
+    return Status::IOError("Operation forbidden on closed file input stream");
+  }
+
+  inline Status CheckReadRange(int64_t nbytes) {
+    // Upon reaching the end of the current read range, if there is a next read
+    // range, current_range_available_bytes_ will be set to the length of the
+    // next read range, and this will be ensured by ConsumeBuffer;
+    if (ARROW_PREDICT_TRUE(current_range_available_bytes_ >= nbytes)) {
+      return Status::OK();
+    }
+
+    if (current_range_available_bytes_ > 0 && current_range_available_bytes_ < 
nbytes) {
+      if (read_ranges_[read_ranges_idx_].length > nbytes) {
+        return Status::IOError("Read length is illegal");
+      }
+
+      // At the beginning of a new read range and required bytes is more than
+      // read range length, we think it's ok because there are some tentative
+      // read requests when getting next page data.
+      return Status::OK();
+    }
+
+    if (read_ranges_idx_ != read_ranges_.size()) {

Review Comment:
   > Why not checking this before line 230?
   
   We can not check this before line 230 for there may be more ranges to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to