This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ce823a  feat: add batch reader infrastructure (#46)
9ce823a is described below

commit 9ce823a6fcf2087e7dd7ced8db1158046e160ca7
Author: lszskye <[email protected]>
AuthorDate: Thu Jun 4 16:31:18 2026 +0800

    feat: add batch reader infrastructure (#46)
---
 include/paimon/reader/batch_reader.h               |  87 +++++++++
 include/paimon/reader/file_batch_reader.h          |  62 +++++++
 include/paimon/reader/prefetch_file_batch_reader.h |  99 ++++++++++
 src/paimon/common/reader/batch_reader.cpp          |  46 +++++
 .../reader/complete_row_kind_batch_reader.cpp      | 111 ++++++++++++
 .../common/reader/complete_row_kind_batch_reader.h |  73 ++++++++
 .../reader/complete_row_kind_batch_reader_test.cpp | 172 ++++++++++++++++++
 src/paimon/common/reader/concat_batch_reader.cpp   |  69 +++++++
 src/paimon/common/reader/concat_batch_reader.h     |  51 ++++++
 .../common/reader/concat_batch_reader_test.cpp     | 172 ++++++++++++++++++
 .../common/reader/delegating_prefetch_reader.h     |  89 +++++++++
 .../common/reader/predicate_batch_reader.cpp       | 107 +++++++++++
 src/paimon/common/reader/predicate_batch_reader.h  |  70 +++++++
 .../common/reader/predicate_batch_reader_test.cpp  | 201 +++++++++++++++++++++
 src/paimon/common/reader/reader_utils.cpp          | 113 ++++++++++++
 src/paimon/common/reader/reader_utils.h            |  61 +++++++
 src/paimon/common/reader/reader_utils_test.cpp     | 107 +++++++++++
 17 files changed, 1690 insertions(+)

diff --git a/include/paimon/reader/batch_reader.h 
b/include/paimon/reader/batch_reader.h
new file mode 100644
index 0000000..07835c4
--- /dev/null
+++ b/include/paimon/reader/batch_reader.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "paimon/metrics.h"
+#include "paimon/result.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "paimon/visibility.h"
+
+struct ArrowArray;   // IWYU pragma: keep
+struct ArrowSchema;  // IWYU pragma: keep
+
+namespace paimon {
+/// A batch reader that supports reading batch data into an arrow array.
+class PAIMON_EXPORT BatchReader {
+ public:
+    virtual ~BatchReader() = default;
+    using ReadBatch = std::pair<std::unique_ptr<ArrowArray>, 
std::unique_ptr<ArrowSchema>>;
+    using ReadBatchWithBitmap = std::pair<ReadBatch, RoaringBitmap32>;
+
+    /// Retrieves the next batch of data.
+    ///
+    /// If EOF is reached, returns an OK status with a nullptr array. Returns 
an error status only
+    /// for critical failures (e.g., IO errors). Once an error is returned, 
this method must not be
+    /// retried, as it will repeatedly return the same error code.
+    ///
+    /// @return A result containing a `::ReadBatch`, which consists of a 
unique pointer to
+    /// `ArrowArray` and a unique pointer to `ArrowSchema`. Returned array 
contains a `_VALUE_KIND`
+    /// field (the first field) to indicate the row kind of each row. Deleted 
or index-filtered rows
+    /// are removed.
+    virtual Result<ReadBatch> NextBatch() = 0;
+
+    /// Retrieves the next batch of data.
+    ///
+    /// If EOF is reached, returns an OK status with a nullptr array. Returns 
an error status only
+    /// for critical failures (e.g., IO errors). Once an error is returned, 
this method must not be
+    /// retried, as it will repeatedly return the same error code.
+    ///
+    /// @return A result containing a `::ReadBatch` and a valid bitmap. 
`::ReadBatch` consists of a
+    /// unique pointer to `ArrowArray` and a unique pointer to `ArrowSchema`. 
Returned array
+    /// contains a _VALUE_KIND field (the first field) to indicate the row 
kind of each row. Deleted
+    /// or index-filtered records maybe maintained in `::ReadBatch`, while 
bitmap indicates valid
+    /// row id. If deletion vector or index are enabled, this function is more 
efficient than
+    /// `NextBatch()`. The default implementation calls `NextBatch()` and adds 
all rows to valid
+    /// bitmap. Noted that the returned bitmap has at least one valid row id.
+    virtual Result<ReadBatchWithBitmap> NextBatchWithBitmap();
+
+    /// Retrieves the reader's metrics.
+    /// Note that calling this method frequently may incur significant 
performance overhead.
+    /// @return A shared pointer to the `Metrics` object.
+    virtual std::shared_ptr<Metrics> GetReaderMetrics() const = 0;
+
+    /// Closes the `BatchReader`, releasing any associated resources.
+    /// After calling this method, further calls to `NextBatch()` is undefined 
and should be
+    /// avoided.
+    virtual void Close() = 0;
+
+    /// Determine whether a `::ReadBatch` or `::ReadBatchWithBitmap` is eof 
batch, if return true,
+    /// all the data has been returned.
+    static bool IsEofBatch(const ReadBatch& batch);
+    static bool IsEofBatch(const ReadBatchWithBitmap& batch_with_bitmap);
+
+    /// Make an eof batch or batch with bitmap.
+    static ReadBatch MakeEofBatch();
+    static ReadBatchWithBitmap MakeEofBatchWithBitmap();
+};
+}  // namespace paimon
diff --git a/include/paimon/reader/file_batch_reader.h 
b/include/paimon/reader/file_batch_reader.h
new file mode 100644
index 0000000..38c89f0
--- /dev/null
+++ b/include/paimon/reader/file_batch_reader.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "paimon/predicate/predicate.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/utils/roaring_bitmap32.h"
+namespace paimon {
+/// The batch reader for a single file supports returning the line number of 
the last batch read for
+/// deletion vector judgment.
+class PAIMON_EXPORT FileBatchReader : public BatchReader {
+ public:
+    /// @return The schema of the file.
+    virtual Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const = 0;
+
+    /// Resets the read schema and predicate.
+    ///
+    /// If `SetReadSchema()` is not called, `NextBatch()` will return data 
with the file schema.
+    /// After resetting the read schema, `NextBatch()` will read data starting 
from the first row.
+    ///
+    /// @param read_schema The schema to set for reading.
+    /// @param predicate The predicate to apply for filtering data.
+    /// @param selection_bitmap The bitmap to apply for filtering data.
+    /// @return The status of the operation.
+    virtual Status SetReadSchema(::ArrowSchema* read_schema,
+                                 const std::shared_ptr<Predicate>& predicate,
+                                 const std::optional<RoaringBitmap32>& 
selection_bitmap) = 0;
+    using BatchReader::NextBatch;
+    using BatchReader::NextBatchWithBitmap;
+
+    /// Get the row number of the first row in the previously read batch.
+    virtual Result<uint64_t> GetPreviousBatchFirstRowNumber() const = 0;
+
+    /// Get the number of rows in the file.
+    virtual Result<uint64_t> GetNumberOfRows() const = 0;
+
+    /// Get whether or not support read precisely while bitmap pushed down.
+    virtual bool SupportPreciseBitmapSelection() const = 0;
+};
+
+}  // namespace paimon
diff --git a/include/paimon/reader/prefetch_file_batch_reader.h 
b/include/paimon/reader/prefetch_file_batch_reader.h
new file mode 100644
index 0000000..acc7d0b
--- /dev/null
+++ b/include/paimon/reader/prefetch_file_batch_reader.h
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "paimon/reader/file_batch_reader.h"
+
+namespace paimon {
+
+/// The prefetch file batch reader extends the basic FileBatchReader interface 
for prefetch read,
+/// if a format implementation inherits from this class, it will automatically 
support the C++
+/// Paimon prefetch capability and integrate with the Paimon prefetch 
framework.
+class PAIMON_EXPORT PrefetchFileBatchReader : public FileBatchReader {
+ public:
+    /// Seeks to a specific row in the file.
+    /// @param row_number The row number to seek to.
+    /// @return The status of the operation.
+    virtual Status SeekToRow(uint64_t row_number) = 0;
+
+    /// Retrieves the row number of the next row to be read.
+    /// This method indicates the current read position within the file.
+    /// @return The row number of the next row to read.
+    virtual uint64_t GetNextRowToRead() const = 0;
+
+    /// Generates a list of row ranges to be read in batches.
+    /// Each range specifies the start and end row numbers for a batch,
+    /// allowing for efficient batch processing.
+    ///
+    /// The underlying format layer (e.g., parquet) is responsible for 
determining
+    /// the most effective way to split the data. This could be by row groups, 
stripes,
+    /// or other internal data structures. The key principle is to split the 
data
+    /// into contiguous, seekable ranges to minimize read amplification.
+    ///
+    /// For example:
+    /// - A parquet format could split by RowGroup directly, ensuring each 
range aligns
+    /// with a single RowGroup.
+    ///
+    /// The smallest splittable unit must be seekable to its start position, 
and the
+    /// splitting strategy should aim to avoid read amplification.
+    ///
+    /// @param need_prefetch A pointer to a boolean. The format layer sets 
this to indicate whether
+    /// prefetching is beneficial for the current scenario, to avoid 
performance regression in
+    /// certain cases.
+    /// @return A vector of pairs, where each pair represents a range with a 
start and end row
+    /// number.
+    virtual Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const = 0;
+
+    /// Sets the specific row ranges as a hint to be read from format file.
+    ///
+    /// If the specific file format does not support explicit range-based 
reads, implementations may
+    /// gracefully ignore this hint and provide an empty (no-op) 
implementation.
+    ///
+    /// @param read_ranges A vector of pairs, where each pair defines a 
half-open interval
+    /// `[start_row, end_row)`. The `start_row` is inclusive, and the 
`end_row` is exclusive.
+    virtual Status SetReadRanges(const std::vector<std::pair<uint64_t, 
uint64_t>>& read_ranges) = 0;
+
+    /// Returns a list of file offset/length ranges that should be prefetched 
for the current read
+    /// scenario.
+    ///
+    /// This method should analyze the columns selected by the user and return 
the minimal set of
+    /// physical file ranges (offset, length) that need to be read, avoiding 
unnecessary IO
+    /// amplification. For example, if only a subset of columns is requested, 
the implementation
+    /// should only return the byte ranges corresponding to those columns, 
rather than the entire
+    /// row group or block.
+    ///
+    /// This enables the cache to prefetch only the required data, reducing 
disk and network load
+    /// and improving performance for columnar formats and selective queries.
+    ///
+    /// By default, returns an empty list (no prefetching). Format-specific 
implementations should
+    /// override this method to provide accurate offset/length hints for 
efficient IO.
+    /// @return A vector of pairs, where each pair contains the file offset 
and length to be
+    /// prefetched.
+    virtual Result<std::vector<std::pair<uint64_t, uint64_t>>> 
PreBufferRange() {
+        return std::vector<std::pair<uint64_t, uint64_t>>{};
+    }
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/batch_reader.cpp 
b/src/paimon/common/reader/batch_reader.cpp
new file mode 100644
index 0000000..1a1734d
--- /dev/null
+++ b/src/paimon/common/reader/batch_reader.cpp
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/reader/batch_reader.h"
+
+#include "arrow/c/abi.h"
+#include "paimon/common/reader/reader_utils.h"
+
+namespace paimon {
+BatchReader::ReadBatch BatchReader::MakeEofBatch() {
+    return std::make_pair(std::unique_ptr<ArrowArray>(), 
std::unique_ptr<ArrowSchema>());
+}
+BatchReader::ReadBatchWithBitmap BatchReader::MakeEofBatchWithBitmap() {
+    return std::make_pair(MakeEofBatch(), RoaringBitmap32());
+}
+
+bool BatchReader::IsEofBatch(const ReadBatch& batch) {
+    return batch.first == nullptr;
+}
+
+bool BatchReader::IsEofBatch(const ReadBatchWithBitmap& batch_with_bitmap) {
+    return batch_with_bitmap.first.first == nullptr;
+}
+
+Result<BatchReader::ReadBatchWithBitmap> BatchReader::NextBatchWithBitmap() {
+    PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, NextBatch());
+    return ReaderUtils::AddAllValidBitmap(std::move(batch));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.cpp 
b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp
new file mode 100644
index 0000000..732f9b2
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader.cpp
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/complete_row_kind_batch_reader.h"
+
+#include <cstddef>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/util.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/scalar.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/types/row_kind.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+Result<BatchReader::ReadBatch> CompleteRowKindBatchReader::NextBatch() {
+    PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+                           NextBatchWithBitmap());
+    return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), 
arrow_pool_.get());
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
CompleteRowKindBatchReader::NextBatchWithBitmap() {
+    PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+                           reader_->NextBatchWithBitmap());
+    if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+        return batch_with_bitmap;
+    }
+    auto& [batch, bitmap] = batch_with_bitmap;
+    auto& [c_array, c_schema] = batch;
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                      arrow::ImportArray(c_array.get(), 
c_schema.get()));
+    auto struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+    if (!struct_array) {
+        return Status::Invalid("cannot cast array to StructArray in 
CompleteRowKindBatchReader");
+    }
+    if (struct_array->GetFieldByName(SpecialFields::ValueKind().Name())) {
+        // batch returned by reader_ has value kind, just return
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(
+            arrow::ExportArray(*struct_array, c_array.get(), c_schema.get()));
+        return batch_with_bitmap;
+    }
+    // create value kind array, all are insert
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> row_kind_array,
+                           PrepareRowKindArray(struct_array->length()));
+    // complete row kind
+    UpdateFieldNamesWithRowKind(struct_array);
+    arrow::ArrayVector fields_with_row_kind = {row_kind_array};
+    fields_with_row_kind.insert(fields_with_row_kind.end(), 
struct_array->fields().begin(),
+                                struct_array->fields().end());
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        std::shared_ptr<arrow::StructArray> array_with_row_kind,
+        arrow::StructArray::Make(fields_with_row_kind, 
field_names_with_row_kind_));
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(
+        arrow::ExportArray(*array_with_row_kind, c_array.get(), 
c_schema.get()));
+    return batch_with_bitmap;
+}
+
+Result<std::shared_ptr<arrow::Array>> 
CompleteRowKindBatchReader::PrepareRowKindArray(
+    int32_t struct_array_length) {
+    if (!row_kind_array_ || row_kind_array_->length() < struct_array_length) {
+        auto row_kind_scalar =
+            
std::make_shared<arrow::Int8Scalar>(RowKind::Insert()->ToByteValue());
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+            row_kind_array_,
+            arrow::MakeArrayFromScalar(*row_kind_scalar, struct_array_length, 
arrow_pool_.get()));
+        return row_kind_array_;
+    } else {
+        return row_kind_array_->Slice(0, struct_array_length);
+    }
+}
+
+void CompleteRowKindBatchReader::UpdateFieldNamesWithRowKind(
+    const std::shared_ptr<arrow::StructArray>& struct_array) {
+    if (static_cast<size_t>(struct_array->struct_type()->num_fields()) + 1 ==
+        field_names_with_row_kind_.size()) {
+        return;
+    }
+    field_names_with_row_kind_.clear();
+    const auto& fields = struct_array->struct_type()->fields();
+    field_names_with_row_kind_.reserve(fields.size() + 1);
+    field_names_with_row_kind_.push_back(SpecialFields::ValueKind().Name());
+    for (const auto& field : fields) {
+        field_names_with_row_kind_.push_back(field->name());
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader.h 
b/src/paimon/common/reader/complete_row_kind_batch_reader.h
new file mode 100644
index 0000000..021dfb4
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader.h
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+class Metrics;
+
+class CompleteRowKindBatchReader : public BatchReader {
+ public:
+    CompleteRowKindBatchReader(std::unique_ptr<BatchReader>&& reader,
+                               const std::shared_ptr<MemoryPool>& pool)
+        : arrow_pool_(GetArrowPool(pool)), reader_(std::move(reader)) {}
+
+    Result<ReadBatch> NextBatch() override;
+
+    Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+    void Close() override {
+        reader_->Close();
+        row_kind_array_.reset();
+        field_names_with_row_kind_.clear();
+    }
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override {
+        return reader_->GetReaderMetrics();
+    }
+
+ private:
+    Result<std::shared_ptr<arrow::Array>> PrepareRowKindArray(int32_t 
struct_array_length);
+
+    void UpdateFieldNamesWithRowKind(const 
std::shared_ptr<arrow::StructArray>& struct_array);
+
+ private:
+    std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+    std::unique_ptr<BatchReader> reader_;
+    std::shared_ptr<arrow::Array> row_kind_array_;
+    std::vector<std::string> field_names_with_row_kind_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp 
b/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp
new file mode 100644
index 0000000..ab0dc2a
--- /dev/null
+++ b/src/paimon/common/reader/complete_row_kind_batch_reader_test.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/complete_row_kind_batch_reader.h"
+
+#include <map>
+#include <optional>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+class CompleteRowKindBatchReaderTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    std::unique_ptr<BatchReader> PrepareCompleteRowKindBatchReader(
+        const std::string& file_name, const std::shared_ptr<arrow::Schema> 
read_schema,
+        int32_t batch_size, const std::map<std::string, std::string>& options 
= {}) const {
+        EXPECT_OK_AND_ASSIGN(auto file_format, FileFormatFactory::Get("orc", 
options));
+        EXPECT_OK_AND_ASSIGN(auto reader_builder, 
file_format->CreateReaderBuilder(batch_size));
+        std::shared_ptr<FileSystem> file_system = 
std::make_shared<LocalFileSystem>();
+        EXPECT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
+                             file_system->Open(file_name));
+        EXPECT_OK_AND_ASSIGN(auto orc_batch_reader, 
reader_builder->Build(input_stream));
+
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        EXPECT_TRUE(arrow_status.ok());
+        EXPECT_OK(orc_batch_reader->SetReadSchema(c_schema.get(), 
/*predicate=*/nullptr,
+                                                  
/*selection_bitmap=*/std::nullopt));
+        return 
std::make_unique<CompleteRowKindBatchReader>(std::move(orc_batch_reader), 
pool_);
+    }
+
+    std::unique_ptr<BatchReader> PrepareCompleteRowKindBatchReader(
+        const std::shared_ptr<arrow::Array>& src_array, int32_t batch_size) 
const {
+        auto file_batch_reader =
+            std::make_unique<MockFileBatchReader>(src_array, 
src_array->type(), batch_size);
+        return 
std::make_unique<CompleteRowKindBatchReader>(std::move(file_batch_reader), 
pool_);
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(CompleteRowKindBatchReaderTest, TestSimple) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/f1=20/"
+                            
"bucket-0/data-1bd5fd24-4e7d-4ea2-9436-86df0a54b14a-0.orc";
+    std::vector<DataField> read_fields = {DataField(0, arrow::field("f0", 
arrow::utf8())),
+                                          DataField(1, arrow::field("f1", 
arrow::int32())),
+                                          DataField(2, arrow::field("f2", 
arrow::int32())),
+                                          DataField(3, arrow::field("f3", 
arrow::float64()))};
+    auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
+    auto reader = PrepareCompleteRowKindBatchReader(file_name, read_schema, 
/*batch_size=*/1);
+    ASSERT_OK_AND_ASSIGN(auto result_array, 
ReadResultCollector::CollectResult(reader.get()));
+
+    std::shared_ptr<arrow::ChunkedArray> expected_array;
+    std::vector<DataField> result_fields = read_fields;
+    result_fields.insert(result_fields.begin(), SpecialFields::ValueKind());
+    auto result_schema = 
DataField::ConvertDataFieldsToArrowSchema(result_fields);
+    auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(
+        arrow::struct_(result_schema->fields()), {R"([
+        [0, "Lucy", 20, 1, 14.1],
+        [0, "Paul", 20, 1, 18.1]
+])"},
+        &expected_array);
+    ASSERT_TRUE(array_status.ok());
+    ASSERT_TRUE(expected_array->Equals(*result_array));
+    reader->Close();
+}
+
+TEST_F(CompleteRowKindBatchReaderTest, TestInnerReaderContainsRowKind) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/f1=20/"
+                            
"bucket-0/data-1bd5fd24-4e7d-4ea2-9436-86df0a54b14a-0.orc";
+    std::vector<DataField> read_fields = {SpecialFields::ValueKind(),
+                                          DataField(0, arrow::field("f0", 
arrow::utf8())),
+                                          DataField(1, arrow::field("f1", 
arrow::int32())),
+                                          DataField(2, arrow::field("f2", 
arrow::int32())),
+                                          DataField(3, arrow::field("f3", 
arrow::float64()))};
+    auto read_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
+
+    auto reader = PrepareCompleteRowKindBatchReader(file_name, read_schema, 
/*batch_size=*/1);
+    ASSERT_OK_AND_ASSIGN(auto result_array, 
ReadResultCollector::CollectResult(reader.get()));
+
+    std::shared_ptr<arrow::ChunkedArray> expected_array;
+    auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(
+        arrow::struct_(read_schema->fields()), {R"([
+        [0, "Lucy", 20, 1, 14.1],
+        [0, "Paul", 20, 1, 18.1]
+])"},
+        &expected_array);
+    ASSERT_TRUE(array_status.ok());
+    ASSERT_TRUE(expected_array->Equals(*result_array));
+    reader->Close();
+}
+
+TEST_F(CompleteRowKindBatchReaderTest, TestNestedType) {
+    arrow::FieldVector fields = {
+        arrow::field("f0", arrow::list(arrow::struct_(
+                               {field("a", arrow::int64()), field("b", 
arrow::boolean())}))),
+        arrow::field("f1", arrow::map(arrow::struct_({field("a", 
arrow::int64()),
+                                                      field("b", 
arrow::boolean())}),
+                                      arrow::boolean()))};
+    auto src_array = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [[null, [1, true], null], [[[1, true], true]]],
+        [[[2, false], null], null],
+        [[[2, false], [3, true], [4, null]], [[[1, true], true], [[5, false], 
null]]],
+        [null, null]
+    ])")
+                         .ValueOrDie();
+    ASSERT_TRUE(src_array);
+    auto reader = PrepareCompleteRowKindBatchReader(src_array, 
/*batch_size=*/3);
+    ASSERT_OK_AND_ASSIGN(auto result_array, 
ReadResultCollector::CollectResult(reader.get()));
+
+    std::shared_ptr<arrow::ChunkedArray> expected_array;
+    arrow::FieldVector read_fields = fields;
+    read_fields.insert(read_fields.begin(), arrow::field("_VALUE_KIND", 
arrow::int8()));
+    auto array_status =
+        
arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow::struct_(read_fields), 
{R"([
+        [0, [null, [1, true], null], [[[1, true], true]]],
+        [0, [[2, false], null], null],
+        [0, [[2, false], [3, true], [4, null]], [[[1, true], true], [[5, 
false], null]]],
+        [0, null, null]
+])"},
+                                                         &expected_array);
+    ASSERT_TRUE(array_status.ok());
+    ASSERT_TRUE(expected_array->Equals(*result_array));
+    reader->Close();
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/reader/concat_batch_reader.cpp 
b/src/paimon/common/reader/concat_batch_reader.cpp
new file mode 100644
index 0000000..c36886c
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader.cpp
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/concat_batch_reader.h"
+
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+
+namespace paimon {
+class MemoryPool;
+
+ConcatBatchReader::ConcatBatchReader(std::vector<std::unique_ptr<BatchReader>>&&
 readers,
+                                     const std::shared_ptr<MemoryPool>& pool)
+    : arrow_pool_(GetArrowPool(pool)), readers_(std::move(readers)), 
current_(0) {}
+
+Result<BatchReader::ReadBatch> ConcatBatchReader::NextBatch() {
+    PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+                           NextBatchWithBitmap());
+    return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), 
arrow_pool_.get());
+}
+
+void ConcatBatchReader::Close() {
+    for (; current_ < readers_.size(); current_++) {
+        readers_[current_]->Close();
+    }
+}
+
+std::shared_ptr<Metrics> ConcatBatchReader::GetReaderMetrics() const {
+    return MetricsImpl::CollectReadMetrics(readers_);
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
ConcatBatchReader::NextBatchWithBitmap() {
+    while (current_ < readers_.size()) {
+        auto& current_reader = readers_[current_];
+        PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap result,
+                               current_reader->NextBatchWithBitmap());
+        if (!BatchReader::IsEofBatch(result)) {
+            // current reader not eof, just return
+            return result;
+        }
+        // current meets eof, move to next reader
+        current_reader->Close();
+        current_++;
+    }
+    // read finish
+    return BatchReader::MakeEofBatchWithBitmap();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/concat_batch_reader.h 
b/src/paimon/common/reader/concat_batch_reader.h
new file mode 100644
index 0000000..7a1b6a1
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader.h
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "arrow/api.h"
+#include "paimon/metrics.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// This reader is to concatenate a list of BatchReaders and read them 
sequentially. The input list
+/// is already sorted by key and sequence number, and the key intervals do not 
overlap each other.
+class ConcatBatchReader : public BatchReader {
+ public:
+    ConcatBatchReader(std::vector<std::unique_ptr<BatchReader>>&& readers,
+                      const std::shared_ptr<MemoryPool>& pool);
+
+    Result<ReadBatch> NextBatch() override;
+    Result<ReadBatchWithBitmap> NextBatchWithBitmap() override;
+    void Close() override;
+    std::shared_ptr<Metrics> GetReaderMetrics() const override;
+
+ private:
+    std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+    std::vector<std::unique_ptr<BatchReader>> readers_;
+    size_t current_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/concat_batch_reader_test.cpp 
b/src/paimon/common/reader/concat_batch_reader_test.cpp
new file mode 100644
index 0000000..37a415d
--- /dev/null
+++ b/src/paimon/common/reader/concat_batch_reader_test.cpp
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/concat_batch_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <string>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon::test {
+class ConcatBatchReaderTest : public ::testing::Test {
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+    void CheckResult(const std::vector<std::string>& batches, const 
std::string& expected) {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> 
batches_with_bitmap;
+        for (const auto& batch_str : batches) {
+            int32_t row_count = std::count(batch_str.begin(), batch_str.end(), 
',');
+            if (batch_str != "[]") {
+                row_count += 1;
+            }
+            std::vector<int32_t> bitmap_data;
+            for (int32_t i = 0; i < row_count; i++) {
+                bitmap_data.push_back(i);
+            }
+            batches_with_bitmap.emplace_back(batch_str, bitmap_data);
+        }
+        return CheckResult(batches_with_bitmap, expected);
+    }
+
+    void CheckResult(const std::vector<std::pair<std::string, 
std::vector<int32_t>>>& batches,
+                     const std::string& expected) {
+        for (const auto& batch_size : {1, 2, 4, 8}) {
+            std::vector<std::unique_ptr<BatchReader>> readers;
+            for (const auto& [batch_str, bitmap_data] : batches) {
+                auto f1 = 
arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), batch_str)
+                              .ValueOrDie();
+                std::shared_ptr<arrow::Array> data =
+                    arrow::StructArray::Make({f1}, {arrow::field("f1", 
arrow::int32())})
+                        .ValueOrDie();
+                auto reader = std::make_unique<MockFileBatchReader>(
+                    data, data->type(), RoaringBitmap32::From(bitmap_data), 
batch_size);
+                readers.push_back(std::move(reader));
+            }
+            auto concat_reader = 
std::make_unique<ConcatBatchReader>(std::move(readers), pool_);
+            ASSERT_OK_AND_ASSIGN(auto result_chunk_array,
+                                 
ReadResultCollector::CollectResult(concat_reader.get()));
+            if (expected.empty()) {
+                ASSERT_FALSE(result_chunk_array);
+                return;
+            }
+            auto expected_f1 =
+                arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), 
expected).ValueOrDie();
+            std::shared_ptr<arrow::Array> expected_array =
+                arrow::StructArray::Make({expected_f1}, {arrow::field("f1", 
arrow::int32())})
+                    .ValueOrDie();
+            auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_array);
+            ASSERT_TRUE(expected_chunk_array->Equals(result_chunk_array))
+                << result_chunk_array->ToString();
+        }
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+TEST_F(ConcatBatchReaderTest, TestSimple) {
+    CheckResult({"[10, 11, 12, 13, 14]"}, "[10, 11, 12, 13, 14]");
+
+    CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]"},
+                "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+    CheckResult({"[]", "[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", 
"[100]"},
+                "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+    CheckResult({"[10, 11, 12, 13, 14]", "[]", "[16, 17, 20]", "[24]", 
"[100]"},
+                "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+    CheckResult({"[10, 11, 12, 13, 14]", "[16, 17, 20]", "[24]", "[100]", 
"[]"},
+                "[10, 11, 12, 13, 14, 16, 17, 20, 24, 100]");
+
+    // no data in reader
+    CheckResult({"[]"}, "");
+
+    // no reader
+    CheckResult(std::vector<std::string>{}, "");
+}
+
+TEST_F(ConcatBatchReaderTest, TestSimpleWithBitmap) {
+    {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+            {"[10, 11, 12, 13, 14]", {0, 1, 3, 4}}};
+        CheckResult(src_data, "[10, 11, 13, 14]");
+    }
+    {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+            {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+            {"[16, 17, 20]", {0, 2}},
+            {"[24]", {}},
+            {"[100]", {0}}};
+        CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+    }
+    {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+            {"[]", {}},
+            {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+            {"[16, 17, 20]", {0, 2}},
+            {"[24]", {}},
+            {"[100]", {0}}};
+        CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+    }
+    {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+            {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+            {"[]", {}},
+            {"[16, 17, 20]", {0, 2}},
+            {"[24]", {}},
+            {"[100]", {0}}};
+        CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+    }
+    {
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = {
+            {"[10, 11, 12, 13, 14]", {1, 2, 3}},
+            {"[16, 17, 20]", {0, 2}},
+            {"[24]", {}},
+            {"[100]", {0}},
+            {"[]", {}},
+        };
+        CheckResult(src_data, "[11, 12, 13, 16, 20, 100]");
+    }
+    {
+        // no data in reader
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = 
{{"[]", {}},
+                                                                              
{"[]", {}}};
+        CheckResult(src_data, "");
+    }
+    {
+        // no reader
+        std::vector<std::pair<std::string, std::vector<int32_t>>> src_data = 
{};
+        CheckResult(src_data, "");
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/reader/delegating_prefetch_reader.h 
b/src/paimon/common/reader/delegating_prefetch_reader.h
new file mode 100644
index 0000000..6d2b09c
--- /dev/null
+++ b/src/paimon/common/reader/delegating_prefetch_reader.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/bridge.h"
+#include "arrow/type.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/reader/file_batch_reader.h"
+
+namespace paimon {
+
+class DelegatingPrefetchReader : public FileBatchReader {
+ public:
+    explicit 
DelegatingPrefetchReader(std::unique_ptr<PrefetchFileBatchReaderImpl> 
prefetch_reader)
+        : prefetch_reader_(std::move(prefetch_reader)) {}
+
+    Result<ReadBatch> NextBatch() override {
+        return Status::Invalid(
+            "paimon inner reader DelegatingPrefetchReader should use 
NextBatchWithBitmap");
+    }
+
+    Result<ReadBatchWithBitmap> NextBatchWithBitmap() override {
+        return GetReader()->NextBatchWithBitmap();
+    }
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override {
+        return GetReader()->GetReaderMetrics();
+    }
+
+    Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
+        return GetReader()->GetFileSchema();
+    }
+
+    Status SetReadSchema(::ArrowSchema* read_schema, const 
std::shared_ptr<Predicate>& predicate,
+                         const std::optional<RoaringBitmap32>& 
selection_bitmap) override {
+        return prefetch_reader_->SetReadSchema(read_schema, predicate, 
selection_bitmap);
+    }
+
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+        return GetReader()->GetPreviousBatchFirstRowNumber();
+    }
+
+    Result<uint64_t> GetNumberOfRows() const override {
+        return GetReader()->GetNumberOfRows();
+    }
+
+    void Close() override {
+        return prefetch_reader_->Close();
+    }
+
+    bool SupportPreciseBitmapSelection() const override {
+        return GetReader()->SupportPreciseBitmapSelection();
+    }
+
+ private:
+    inline FileBatchReader* GetReader() const {
+        assert(prefetch_reader_);
+        if (prefetch_reader_->NeedPrefetch()) {
+            return prefetch_reader_.get();
+        } else {
+            return prefetch_reader_->GetFirstReader();
+        }
+    }
+
+    std::unique_ptr<PrefetchFileBatchReaderImpl> prefetch_reader_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader.cpp 
b/src/paimon/common/reader/predicate_batch_reader.cpp
new file mode 100644
index 0000000..3bdea51
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/predicate_batch_reader.h"
+
+#include <cassert>
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/memory_pool.h"
+#include "fmt/format.h"
+#include "paimon/common/predicate/predicate_filter.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/predicate/predicate.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+PredicateBatchReader::PredicateBatchReader(std::unique_ptr<BatchReader>&& 
reader,
+                                           const 
std::shared_ptr<PredicateFilter>& predicate_filter,
+                                           const std::shared_ptr<MemoryPool>& 
pool)
+    : arrow_pool_(GetArrowPool(pool)),
+      reader_(std::move(reader)),
+      predicate_filter_(predicate_filter) {}
+
+Result<std::unique_ptr<PredicateBatchReader>> PredicateBatchReader::Create(
+    std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>& 
predicate,
+    const std::shared_ptr<MemoryPool>& pool) {
+    if (!predicate) {
+        return Status::Invalid("create predicate batch reader failed. 
predicate is nullptr");
+    }
+    auto predicate_filter = 
std::dynamic_pointer_cast<PredicateFilter>(predicate);
+    if (!predicate_filter) {
+        return Status::Invalid(
+            fmt::format("predicate {} does not support Test", 
predicate->ToString()));
+    }
+    return std::unique_ptr<PredicateBatchReader>(
+        new PredicateBatchReader(std::move(reader), predicate_filter, pool));
+}
+
+Result<BatchReader::ReadBatch> PredicateBatchReader::NextBatch() {
+    PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap,
+                           NextBatchWithBitmap());
+    return ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), 
arrow_pool_.get());
+}
+
+Result<BatchReader::ReadBatchWithBitmap> 
PredicateBatchReader::NextBatchWithBitmap() {
+    while (true) {
+        PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap 
batch_with_bitmap,
+                               reader_->NextBatchWithBitmap());
+        if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+            return batch_with_bitmap;
+        }
+        auto& [batch, bitmap] = batch_with_bitmap;
+        auto& [c_array, c_schema] = batch;
+        assert(c_array);
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array,
+                                          arrow::ImportArray(c_array.get(), 
c_schema.get()));
+        PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 valid_bitmap, Filter(array));
+        bitmap &= valid_bitmap;
+        if (bitmap.IsEmpty()) {
+            continue;
+        }
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, 
c_array.get(), c_schema.get()));
+        return batch_with_bitmap;
+    }
+}
+
+Result<RoaringBitmap32> PredicateBatchReader::Filter(
+    const std::shared_ptr<arrow::Array>& array) const {
+    PAIMON_ASSIGN_OR_RAISE(std::vector<char> result, 
predicate_filter_->Test(*array));
+    assert(result.size() == static_cast<size_t>(array->length()));
+    RoaringBitmap32 is_valid;
+    for (int32_t i = 0; i < static_cast<int32_t>(result.size()); i++) {
+        if (result[i]) {
+            is_valid.Add(i);
+        }
+    }
+    return is_valid;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader.h 
b/src/paimon/common/reader/predicate_batch_reader.h
new file mode 100644
index 0000000..ddfd72a
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+
+#include "arrow/memory_pool.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon {
+class MemoryPool;
+class Metrics;
+class Predicate;
+class PredicateFilter;
+
+class PredicateBatchReader : public BatchReader {
+ public:
+    static Result<std::unique_ptr<PredicateBatchReader>> Create(
+        std::unique_ptr<BatchReader>&& reader, const 
std::shared_ptr<Predicate>& predicate,
+        const std::shared_ptr<MemoryPool>& pool);
+
+    ~PredicateBatchReader() override = default;
+
+    Result<BatchReader::ReadBatch> NextBatch() override;
+
+    Result<BatchReader::ReadBatchWithBitmap> NextBatchWithBitmap() override;
+
+    void Close() override {
+        return reader_->Close();
+    }
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override {
+        return reader_->GetReaderMetrics();
+    }
+
+ private:
+    PredicateBatchReader(std::unique_ptr<BatchReader>&& reader,
+                         const std::shared_ptr<PredicateFilter>& 
predicate_filter,
+                         const std::shared_ptr<MemoryPool>& pool);
+    Result<RoaringBitmap32> Filter(const std::shared_ptr<arrow::Array>& array) 
const;
+
+ private:
+    std::unique_ptr<arrow::MemoryPool> arrow_pool_;
+    std::unique_ptr<BatchReader> reader_;
+    std::shared_ptr<PredicateFilter> predicate_filter_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/predicate_batch_reader_test.cpp 
b/src/paimon/common/reader/predicate_batch_reader_test.cpp
new file mode 100644
index 0000000..e23c257
--- /dev/null
+++ b/src/paimon/common/reader/predicate_batch_reader_test.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/predicate_batch_reader.h"
+
+#include <cstdint>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/defs.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/status.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::test {
+class PredicateBatchReaderTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        fields_ = {arrow::field("f0", arrow::utf8()), arrow::field("f1", 
arrow::int64()),
+                   arrow::field("f2", arrow::boolean())};
+        data_type_ = arrow::struct_(fields_);
+    }
+
+    void TearDown() override {}
+
+    std::shared_ptr<arrow::Array> PrepareArray(int32_t length, int32_t offset 
= 0) {
+        arrow::StructBuilder struct_builder(
+            data_type_, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int64Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto big_int_builder = 
static_cast<arrow::Int64Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < length + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+            EXPECT_TRUE(big_int_builder->Append(i).ok());
+            EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void CheckResult(std::unique_ptr<BatchReader>&& reader,
+                     const std::shared_ptr<Predicate>& predicate,
+                     const std::shared_ptr<arrow::ChunkedArray>& 
expected_array) const {
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate_reader,
+            PredicateBatchReader::Create(std::move(reader), predicate, 
GetDefaultPool()));
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::ChunkedArray> result_array,
+                             
ReadResultCollector::CollectResult(predicate_reader.get()));
+        if (expected_array) {
+            ASSERT_TRUE(result_array->Equals(expected_array));
+        } else {
+            ASSERT_FALSE(result_array);
+        }
+    }
+
+ private:
+    arrow::FieldVector fields_;
+    std::shared_ptr<arrow::DataType> data_type_;
+};
+
+TEST_F(PredicateBatchReaderTest, TestSimple) {
+    auto data_array = PrepareArray(100);
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                    FieldType::BIGINT, 
Literal(24l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 24));
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                    FieldType::BIGINT, 
Literal(1l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 1));
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                    FieldType::BIGINT, 
Literal(99l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array->Slice(0, 99));
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                       FieldType::BIGINT, 
Literal(0l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array->Slice(1, 99));
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                       FieldType::BIGINT, 
Literal(98l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array->Slice(99, 1));
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+}
+
+TEST_F(PredicateBatchReaderTest, TestVariousBatchSize) {
+    auto data_array = arrow::ipc::internal::json::ArrayFromJSON(data_type_, 
R"([
+        ["str_-1", -1, false],
+        ["str_0", 0, false], ["str_1", 1, true],
+        ["str_-1", -1, false],
+        ["str_2", 2, false], ["str_3", 3, true],
+        ["str_4", 4, false], ["str_5", 5, true], ["str_6", 6, false],
+        ["str_-1", -1, false],
+        ["str_7", 7, true],
+        ["str_-1", -1, false]
+    ])")
+                          .ValueOrDie();
+    auto expected_array = 
std::make_shared<arrow::ChunkedArray>(PrepareArray(8));
+    auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                   FieldType::BIGINT, 
Literal(-1l));
+    for (auto batch_size : {5, 10, 11, 20}) {
+        auto reader = std::make_unique<MockFileBatchReader>(data_array, 
data_type_, batch_size);
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+}
+
+TEST_F(PredicateBatchReaderTest, TestOneByOneCase) {
+    auto data_array = PrepareArray(8);
+    auto reader = std::make_unique<MockFileBatchReader>(data_array, 
data_type_, /*batch_size=*/10);
+    auto predicate = PredicateBuilder::Equal(/*field_index=*/2, 
/*field_name=*/"f2",
+                                             FieldType::BOOLEAN, 
Literal(true));
+    std::shared_ptr<arrow::ChunkedArray> expected_array;
+    auto array_status = 
arrow::ipc::internal::json::ChunkedArrayFromJSON(data_type_, {R"([
+        ["str_1", 1, true], ["str_3", 3, true], ["str_5", 5, true], ["str_7", 
7, true]
+    ])"},
+                                                                         
&expected_array);
+    CheckResult(std::move(reader), predicate, expected_array);
+}
+
+TEST_F(PredicateBatchReaderTest, TestFullAndEmptyCase) {
+    auto data_array = PrepareArray(15);
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::LessThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                    FieldType::BIGINT, 
Literal(20l));
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(data_array);
+        CheckResult(std::move(reader), predicate, expected_array);
+    }
+    {
+        auto reader =
+            std::make_unique<MockFileBatchReader>(data_array, data_type_, 
/*batch_size=*/10);
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/1, 
/*field_name=*/"f1",
+                                                       FieldType::BIGINT, 
Literal(20l));
+        CheckResult(std::move(reader), predicate, nullptr);
+    }
+}
+
+TEST_F(PredicateBatchReaderTest, TestInvalidInput) {
+    auto data_array = PrepareArray(8);
+    auto reader = std::make_unique<MockFileBatchReader>(data_array, 
data_type_, /*batch_size=*/10);
+    ASSERT_NOK_WITH_MSG(PredicateBatchReader::Create(std::move(reader), 
nullptr, GetDefaultPool()),
+                        "create predicate batch reader failed. predicate is 
nullptr");
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/reader/reader_utils.cpp 
b/src/paimon/common/reader/reader_utils.cpp
new file mode 100644
index 0000000..7bb28c7
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/reader_utils.h"
+
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+Result<arrow::ArrayVector> ReaderUtils::GenerateFilteredArrayVector(
+    const std::shared_ptr<arrow::Array>& src_array, const RoaringBitmap32& 
bitmap) {
+    if (bitmap.Cardinality() == 0) {
+        return Status::Invalid("selection bitmap cannot be empty in 
GenerateFilteredArrayVector");
+    }
+    arrow::ArrayVector array_vec;
+    auto valid_iter = bitmap.Begin();
+    int32_t pos = 0;
+    while (valid_iter != bitmap.End() && pos < src_array->length()) {
+        int64_t valid_start_pos = *valid_iter;
+        for (pos = *valid_iter; pos < src_array->length() && valid_iter != 
bitmap.End();
+             pos++, ++valid_iter) {
+            if (pos != *valid_iter) {
+                break;
+            }
+        }
+        int64_t valid_end_pos = pos;
+        array_vec.push_back(src_array->Slice(valid_start_pos, valid_end_pos - 
valid_start_pos));
+    }
+    assert(!array_vec.empty());
+    return array_vec;
+}
+
+void ReaderUtils::ReleaseReadBatch(BatchReader::ReadBatch&& batch) {
+    auto& [c_array, c_schema] = batch;
+    if (c_array) {
+        ArrowArrayRelease(c_array.get());
+    }
+    if (c_schema) {
+        ArrowSchemaRelease(c_schema.get());
+    }
+}
+
+Result<BatchReader::ReadBatch> ReaderUtils::ApplyBitmapToReadBatch(
+    BatchReader::ReadBatchWithBitmap&& batch_with_bitmap, arrow::MemoryPool* 
arrow_pool) {
+    if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+        return std::move(batch_with_bitmap.first);
+    }
+    BatchReader::ReadBatchWithBitmap moved_batch_with_bitmap = 
std::move(batch_with_bitmap);
+    auto& [batch, bitmap] = moved_batch_with_bitmap;
+    auto& [c_array, c_schema] = batch;
+    assert(c_array);
+    if (bitmap.IsEmpty()) {
+        ReleaseReadBatch(std::move(batch));
+        return Status::Invalid(
+            "NextBatchWithBitmap should always return the result with at least 
one valid row "
+            "except eof");
+    }
+    if (bitmap.Cardinality() == c_array->length) {
+        // indicates all rows in batch are valid
+        return std::move(batch);
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                      arrow::ImportArray(c_array.get(), 
c_schema.get()));
+    PAIMON_ASSIGN_OR_RAISE(arrow::ArrayVector array_vec,
+                           GenerateFilteredArrayVector(arrow_array, bitmap));
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> result,
+                                      arrow::Concatenate(array_vec, 
arrow_pool));
+    assert(result && result->length() > 0);
+    std::unique_ptr<ArrowArray> result_c_array = 
std::make_unique<ArrowArray>();
+    std::unique_ptr<ArrowSchema> result_c_schema = 
std::make_unique<ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(
+        arrow::ExportArray(*result, result_c_array.get(), 
result_c_schema.get()));
+    return make_pair(std::move(result_c_array), std::move(result_c_schema));
+}
+
+BatchReader::ReadBatchWithBitmap 
ReaderUtils::AddAllValidBitmap(BatchReader::ReadBatch&& batch) {
+    if (BatchReader::IsEofBatch(batch)) {
+        return BatchReader::MakeEofBatchWithBitmap();
+    }
+    RoaringBitmap32 all_valid;
+    all_valid.AddRange(0, batch.first->length);
+    return std::make_pair(std::move(batch), std::move(all_valid));
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/reader/reader_utils.h 
b/src/paimon/common/reader/reader_utils.h
new file mode 100644
index 0000000..aa77e40
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "arrow/api.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/result.h"
+
+namespace arrow {
+class MemoryPool;
+class Array;
+class StructArray;
+}  // namespace arrow
+
+namespace paimon {
+class RoaringBitmap32;
+
+class ReaderUtils {
+ public:
+    ReaderUtils() = delete;
+    ~ReaderUtils() = delete;
+
+    /// @param batch_with_bitmap where the bitmap records the valid row ids in 
the array
+    /// @param arrow_pool a pool for arrow
+    /// @return returned array contains all the valid rows in the input array
+    /// This function may trigger data copy.
+    static Result<BatchReader::ReadBatch> ApplyBitmapToReadBatch(
+        BatchReader::ReadBatchWithBitmap&& batch_with_bitmap, 
arrow::MemoryPool* arrow_pool);
+    /// @param batch a read batch
+    /// @return return the input batch and a all valid bitmap
+    static BatchReader::ReadBatchWithBitmap 
AddAllValidBitmap(BatchReader::ReadBatch&& batch);
+
+    /// Release the c array and c schema in batch.
+    static void ReleaseReadBatch(BatchReader::ReadBatch&& batch);
+
+    /// Split the array into multiple valid sub-arrays according to the bitmap.
+    /// Precondition: input bitmap is not empty
+    static Result<arrow::ArrayVector> GenerateFilteredArrayVector(
+        const std::shared_ptr<arrow::Array>& src_array, const RoaringBitmap32& 
bitmap);
+};
+}  // namespace paimon
diff --git a/src/paimon/common/reader/reader_utils_test.cpp 
b/src/paimon/common/reader/reader_utils_test.cpp
new file mode 100644
index 0000000..96b5a4d
--- /dev/null
+++ b/src/paimon/common/reader/reader_utils_test.cpp
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "paimon/common/reader/reader_utils.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/c/abi.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon::test {
+TEST(ReaderUtilsTest, TestAddAllValidBitmap) {
+    auto check_result = [](const std::string& src_str) {
+        if (src_str.empty()) {
+            auto batch_with_bitmap = 
ReaderUtils::AddAllValidBitmap(BatchReader::MakeEofBatch());
+            ASSERT_TRUE(BatchReader::IsEofBatch(batch_with_bitmap));
+            auto& [_, bitmap] = batch_with_bitmap;
+            ASSERT_TRUE(bitmap.IsEmpty());
+            return;
+        }
+        auto array =
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), 
src_str).ValueOrDie();
+        ASSERT_OK_AND_ASSIGN(auto batch, 
ReadResultCollector::GetReadBatch(array));
+        auto batch_with_bitmap = 
ReaderUtils::AddAllValidBitmap(std::move(batch));
+        auto& [c_batch, bitmap] = batch_with_bitmap;
+        ASSERT_OK_AND_ASSIGN(auto result_array, 
ReadResultCollector::GetArray(std::move(c_batch)));
+        ASSERT_EQ(bitmap.Cardinality(), result_array->length());
+        ASSERT_TRUE(result_array->Equals(array));
+    };
+
+    check_result("[0, 1, 2, 3, 4]");
+    check_result("[10, 20, 30]");
+    check_result("");
+}
+TEST(ReaderUtilsTest, TestApplyBitmapToReadBatch) {
+    auto check_result = [](const std::string& src_str, const 
std::vector<int32_t>& bitmap_vec,
+                           const std::string& target_str, const std::string& 
erro_msg = "") {
+        auto bitmap = RoaringBitmap32::From(bitmap_vec);
+        if (src_str.empty()) {
+            auto batch_with_bitmap = 
std::make_pair(BatchReader::MakeEofBatch(), std::move(bitmap));
+            ASSERT_OK_AND_ASSIGN(auto result_batch,
+                                 
ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+                                                                     
arrow::default_memory_pool()));
+            ASSERT_TRUE(BatchReader::IsEofBatch(result_batch));
+            return;
+        }
+        auto src_array =
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), 
src_str).ValueOrDie();
+        auto target_array =
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::int32(), 
target_str).ValueOrDie();
+        ASSERT_OK_AND_ASSIGN(auto src_batch, 
ReadResultCollector::GetReadBatch(src_array));
+        auto batch_with_bitmap = std::make_pair(std::move(src_batch), 
std::move(bitmap));
+        if (!erro_msg.empty()) {
+            
ASSERT_NOK_WITH_MSG(ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+                                                                    
arrow::default_memory_pool()),
+                                erro_msg);
+            return;
+        }
+        ASSERT_OK_AND_ASSIGN(auto result_batch,
+                             
ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap),
+                                                                 
arrow::default_memory_pool()));
+        ASSERT_OK_AND_ASSIGN(auto result_array,
+                             
ReadResultCollector::GetArray(std::move(result_batch)));
+        ASSERT_TRUE(result_array->Equals(target_array));
+    };
+    check_result("[10, 11, 12, 13, 14]", {1}, "[11]");
+    check_result("[10, 11, 12, 13, 14]", {0, 1}, "[10, 11]");
+    check_result("[10, 11, 12, 13, 14]", {2, 4}, "[12, 14]");
+    check_result("[10, 11, 12, 13, 14]", {2, 3}, "[12, 13]");
+    check_result("[10, 11, 12, 13, 14]", {0, 1, 3, 4}, "[10, 11, 13, 14]");
+    check_result("[10, 11, 12, 13, 14]", {0, 1, 2, 3, 4}, "[10, 11, 12, 13, 
14]");
+    // eof batch
+    check_result("", {}, "");
+    // bitmap is empty, invalid
+    check_result("[10, 11, 12, 13, 14]", {}, "[]",
+                 "NextBatchWithBitmap should always return the result with at 
least one valid row "
+                 "except eof");
+}
+
+}  // namespace paimon::test

Reply via email to