huberylee commented on code in PR #39393:
URL: https://github.com/apache/arrow/pull/39393#discussion_r1439273272


##########
cpp/src/arrow/io/buffered.cc:
##########
@@ -497,5 +497,141 @@ Future<std::shared_ptr<const KeyValueMetadata>> 
BufferedInputStream::ReadMetadat
   return impl_->raw()->ReadMetadataAsync(io_context);
 }
 
+// ----------------------------------------------------------------------
+// ChunkBufferedInputStream implementation
+
+ChunkBufferedInputStream::ChunkBufferedInputStream(
+    int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> raw,
+    std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t 
io_merge_threshold,
+    std::shared_ptr<ResizableBuffer> buffer)
+    : is_open_(true),
+      raw_pos_(start),
+      raw_end_(start + length),
+      raw_(std::move(raw)),
+      read_ranges_(std::move(read_ranges)),
+      current_range_available_bytes_(read_ranges_[read_ranges_idx_].length),
+      buffer_size_(buffer_size),
+      io_merge_threshold_(io_merge_threshold),
+      buffer_(std::move(buffer)) {}
+
+Result<std::shared_ptr<ChunkBufferedInputStream>> 
ChunkBufferedInputStream::Create(
+    int64_t start, int64_t length, std::shared_ptr<RandomAccessFile> impl,
+    std::vector<ReadRange> read_ranges, int64_t buffer_size, int32_t 
io_merge_threshold,
+    MemoryPool* pool) {
+  ARROW_ASSIGN_OR_RAISE(auto buffer, ::arrow::AllocateResizableBuffer(0, 
pool));
+  return std::shared_ptr<ChunkBufferedInputStream>(
+      new ChunkBufferedInputStream(start, length, std::move(impl), 
std::move(read_ranges),
+                                   buffer_size, io_merge_threshold, 
std::move(buffer)));
+}
+
+Status ChunkBufferedInputStream::Advance(int64_t nbytes) {
+  RETURN_NOT_OK(BufferIfNeeded(nbytes));
+  auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+  ConsumeBuffer(bytes_read);
+  return Status::OK();
+}
+
+Result<std::string_view> ChunkBufferedInputStream::Peek(int64_t nbytes) {
+  RETURN_NOT_OK(BufferIfNeeded(nbytes));
+  auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+  return std::string_view(reinterpret_cast<const char*>(buffer_->data() + 
buffer_pos_),
+                          bytes_read);
+}
+
+Result<int64_t> ChunkBufferedInputStream::Read(int64_t nbytes, void* out) {
+  RETURN_NOT_OK(BufferIfNeeded(nbytes));
+  auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+  std::memcpy(out, buffer_->data() + buffer_pos_, bytes_read);
+  ConsumeBuffer(bytes_read);
+  return nbytes;
+}
+
+Result<std::shared_ptr<Buffer>> ChunkBufferedInputStream::Read(int64_t nbytes) 
{
+  RETURN_NOT_OK(BufferIfNeeded(nbytes));
+  auto bytes_read = std::min(current_range_available_bytes_, nbytes);
+  auto buffer = SliceBuffer(buffer_, buffer_pos_, bytes_read);
+  ConsumeBuffer(bytes_read);
+  return std::move(buffer);
+}
+
+Status ChunkBufferedInputStream::Close() {
+  is_open_ = false;
+  return Status::OK();
+}
+
+Result<int64_t> ChunkBufferedInputStream::Tell() const {
+  RETURN_NOT_OK(CheckClosed());
+  return raw_pos_;
+}
+
+bool ChunkBufferedInputStream::closed() const { return !is_open_; }
+
+Status ChunkBufferedInputStream::BufferIfNeeded(int64_t nbytes) {
+  RETURN_NOT_OK(CheckClosed());
+  RETURN_NOT_OK(CheckReadRange(nbytes));
+
+  if (ARROW_PREDICT_TRUE(bytes_buffered_ > 0)) {
+    return Status::OK();
+  }
+
+  if (read_ranges_idx_ == read_ranges_.size()) {
+    ARROW_DCHECK(current_range_available_bytes_ == 0);
+    return Status::OK();
+  }
+
+  read_gaps_.clear();

Review Comment:
   It will be triggered every time before actual IO occurs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to