This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new ba2a108  feat(format): introduce parquet reader and writer (#51)
ba2a108 is described below

commit ba2a108a8af68b07398da0180056c76f3ce02cd3
Author: Zhang Jiawei <[email protected]>
AuthorDate: Fri Jun 5 18:29:34 2026 +0800

    feat(format): introduce parquet reader and writer (#51)
---
 src/paimon/format/parquet/file_reader_wrapper.cpp  | 209 +++++++
 src/paimon/format/parquet/file_reader_wrapper.h    | 129 ++++
 .../format/parquet/file_reader_wrapper_test.cpp    | 331 +++++++++++
 .../format/parquet/parquet_file_batch_reader.cpp   | 322 ++++++++++
 .../format/parquet/parquet_file_batch_reader.h     | 187 ++++++
 .../parquet/parquet_file_batch_reader_test.cpp     | 646 +++++++++++++++++++++
 .../format/parquet/parquet_format_writer.cpp       | 109 ++++
 src/paimon/format/parquet/parquet_format_writer.h  |  84 +++
 .../format/parquet/parquet_format_writer_test.cpp  | 470 +++++++++++++++
 9 files changed, 2487 insertions(+)

diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp 
b/src/paimon/format/parquet/file_reader_wrapper.cpp
new file mode 100644
index 0000000..674e3c6
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper.cpp
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/file_reader_wrapper.h"
+
+#include <cassert>
+#include <cstddef>
+
+#include "arrow/record_batch.h"
+#include "arrow/util/range.h"
+#include "fmt/format.h"
+#include "paimon/macros.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<FileReaderWrapper>> FileReaderWrapper::Create(
+    std::unique_ptr<::parquet::arrow::FileReader>&& file_reader) {
+    if (file_reader == nullptr) {
+        return Status::Invalid("file reader wrapper create failed. file reader 
is nullptr");
+    }
+    std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges;
+    auto meta_data = file_reader->parquet_reader()->metadata();
+    // prepare [start_row_idx, end_row_idx) for all row groups
+    uint64_t start_row_idx = 0;
+    for (int32_t i = 0; i < meta_data->num_row_groups(); i++) {
+        uint64_t end_row_idx = start_row_idx + 
meta_data->RowGroup(i)->num_rows();
+        all_row_group_ranges.emplace_back(start_row_idx, end_row_idx);
+        start_row_idx = end_row_idx;
+    }
+    uint64_t num_rows = file_reader->parquet_reader()->metadata()->num_rows();
+    if (start_row_idx != num_rows) {
+        assert(false);
+        return Status::Invalid(
+            fmt::format("unexpected error. row group ranges not match with num 
rows {}", num_rows));
+    }
+    std::vector<int32_t> row_groups_indices = 
arrow::internal::Iota(file_reader->num_row_groups());
+    std::vector<int32_t> columns_indices =
+        
arrow::internal::Iota(file_reader->parquet_reader()->metadata()->num_columns());
+    auto file_reader_wrapper = std::unique_ptr<FileReaderWrapper>(
+        new FileReaderWrapper(std::move(file_reader), all_row_group_ranges, 
num_rows));
+    PAIMON_RETURN_NOT_OK(file_reader_wrapper->PrepareForReadingLazy(
+        std::set<int32_t>(row_groups_indices.begin(), 
row_groups_indices.end()), columns_indices));
+    return file_reader_wrapper;
+}
+
+FileReaderWrapper::FileReaderWrapper(
+    std::unique_ptr<::parquet::arrow::FileReader>&& file_reader,
+    const std::vector<std::pair<uint64_t, uint64_t>>& all_row_group_ranges, 
uint64_t num_rows)
+    : file_reader_(std::move(file_reader)),
+      all_row_group_ranges_(all_row_group_ranges),
+      num_rows_(num_rows) {}
+
+Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
+    for (uint64_t i = 0; i < target_row_groups_.size(); i++) {
+        if (row_number > target_row_groups_[i].first && row_number < 
target_row_groups_[i].second) {
+            return Status::Invalid(fmt::format(
+                "seek to row failed. row number {} should not be in the middle 
of readable range",
+                row_number));
+        }
+        if (target_row_groups_[i].first >= row_number) {
+            current_row_group_idx_ = i;
+            next_row_to_read_ = target_row_groups_[i].first;
+            std::vector<int32_t> target_row_group_indices;
+            for (uint64_t j = i; j < target_row_groups_.size(); j++) {
+                PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, 
GetRowGroupId(target_row_groups_[j]));
+                target_row_group_indices.push_back(row_group_id);
+            }
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader(
+                target_row_group_indices, target_column_indices_, 
&batch_reader_));
+            return Status::OK();
+        }
+    }
+    next_row_to_read_ = num_rows_;
+    current_row_group_idx_ = target_row_groups_.size();
+    return Status::OK();
+}
+
+Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::Next() {
+    if (PAIMON_UNLIKELY(!reader_initialized_)) {
+        PAIMON_RETURN_NOT_OK(PrepareForReading(target_row_group_indices_, 
target_column_indices_));
+    }
+    std::shared_ptr<arrow::RecordBatch> record_batch;
+    if (current_row_group_idx_ < target_row_groups_.size()) {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(record_batch, batch_reader_->Next());
+    }
+    if (record_batch) {
+        int64_t num_rows = record_batch->num_rows();
+        previous_first_row_ = next_row_to_read_;
+        if (next_row_to_read_ + num_rows < 
target_row_groups_[current_row_group_idx_].second) {
+            next_row_to_read_ += num_rows;
+        } else if (next_row_to_read_ + num_rows ==
+                   target_row_groups_[current_row_group_idx_].second) {
+            if (current_row_group_idx_ == target_row_groups_.size() - 1) {
+                // current row group is the last.
+                next_row_to_read_ = num_rows_;
+            } else {
+                current_row_group_idx_++;
+                next_row_to_read_ = 
target_row_groups_[current_row_group_idx_].first;
+            }
+        } else {
+            return Status::Invalid(fmt::format(
+                "Next failed. Unexpected error, next row to read {} + num rows 
just read {} "
+                "should always be within current row group range or exactly 
equals to current "
+                "row group end {}",
+                next_row_to_read_, num_rows, 
target_row_groups_[current_row_group_idx_].second));
+        }
+    } else {
+        previous_first_row_ = next_row_to_read_;
+    }
+    return record_batch;
+}
+
+Result<std::vector<std::pair<uint64_t, uint64_t>>> 
FileReaderWrapper::GetRowGroupRanges(
+    const std::set<int32_t>& row_group_indices) const {
+    std::vector<std::pair<uint64_t, uint64_t>> row_group_ranges;
+    for (auto row_group_index : row_group_indices) {
+        if (static_cast<size_t>(row_group_index) >= 
all_row_group_ranges_.size()) {
+            return Status::Invalid(fmt::format("row group index {} is out of 
bound {}",
+                                               row_group_index, 
all_row_group_ranges_.size()));
+        }
+        row_group_ranges.push_back(all_row_group_ranges_[row_group_index]);
+    }
+    return row_group_ranges;
+}
+
+Status FileReaderWrapper::PrepareForReadingLazy(const std::set<int32_t>& 
target_row_group_indices,
+                                                const std::vector<int32_t>& 
column_indices) {
+    target_row_group_indices_ = target_row_group_indices;
+    target_column_indices_ = column_indices;
+    reader_initialized_ = false;
+    return Status::OK();
+}
+
+Status FileReaderWrapper::PrepareForReading(const std::set<int32_t>& 
target_row_group_indices,
+                                            const std::vector<int32_t>& 
column_indices) {
+    std::vector<std::pair<uint64_t, uint64_t>> target_row_groups;
+    PAIMON_ASSIGN_OR_RAISE(target_row_groups, 
GetRowGroupRanges(target_row_group_indices));
+    std::unique_ptr<arrow::RecordBatchReader> batch_reader;
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader(
+        std::vector<int32_t>(target_row_group_indices.begin(), 
target_row_group_indices.end()),
+        column_indices, &batch_reader));
+    target_row_groups_ = target_row_groups;
+    target_column_indices_ = column_indices;
+    batch_reader_ = std::move(batch_reader);
+    if (target_row_groups_.empty()) {
+        next_row_to_read_ = num_rows_;
+    } else {
+        next_row_to_read_ = target_row_groups_[0].first;
+    }
+    previous_first_row_ = std::numeric_limits<uint64_t>::max();
+    current_row_group_idx_ = 0;
+    reader_initialized_ = true;
+    return Status::OK();
+}
+
+Result<std::set<int32_t>> FileReaderWrapper::FilterRowGroupsByReadRanges(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+    const std::vector<int32_t>& src_row_groups) const {
+    std::set<int32_t> target_row_groups;
+    PAIMON_ASSIGN_OR_RAISE(std::set<int32_t> row_groups_to_read,
+                           ReadRangesToRowGroupIds(read_ranges));
+    for (const auto& row_group_id : src_row_groups) {
+        if (row_groups_to_read.find(row_group_id) != row_groups_to_read.end()) 
{
+            target_row_groups.emplace(row_group_id);
+        }
+    }
+    return target_row_groups;
+}
+
+Result<std::set<int32_t>> FileReaderWrapper::ReadRangesToRowGroupIds(
+    const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const {
+    std::set<int32_t> selected_row_group_ids;
+    for (const auto& read_range : read_ranges) {
+        PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id, 
GetRowGroupId(read_range));
+        selected_row_group_ids.emplace(row_group_id);
+    }
+    return selected_row_group_ids;
+}
+
+Result<int32_t> FileReaderWrapper::GetRowGroupId(std::pair<uint64_t, uint64_t> 
target_range) const {
+    for (size_t i = 0; i < all_row_group_ranges_.size(); i++) {
+        if (all_row_group_ranges_[i] == target_range) {
+            return i;
+        }
+    }
+    return Status::Invalid(fmt::format(
+        "not expected failure. target range bound '{},{}' not match with row 
group range bound",
+        target_range.first, target_range.second));
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/file_reader_wrapper.h 
b/src/paimon/format/parquet/file_reader_wrapper.h
new file mode 100644
index 0000000..f20e94e
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon::parquet {
+
+// The FileReaderWrapper is a decorator class designed to support seek 
functionality, as well as the
+// methods GetPreviousBatchFirstRowNumber and GetNextRowToRead.
+class FileReaderWrapper {
+ public:
+    static Result<std::unique_ptr<FileReaderWrapper>> Create(
+        std::unique_ptr<::parquet::arrow::FileReader>&& reader);
+
+    Status SeekToRow(uint64_t row_number);
+
+    Result<std::shared_ptr<arrow::RecordBatch>> Next();
+
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const {
+        return previous_first_row_;
+    }
+
+    uint64_t GetNextRowToRead() const {
+        return next_row_to_read_;
+    }
+
+    uint64_t GetNumberOfRows() const {
+        return num_rows_;
+    }
+
+    int32_t GetNumberOfRowGroups() const {
+        return file_reader_->num_row_groups();
+    }
+
+    ::parquet::arrow::FileReader* GetFileReader() const {
+        return file_reader_.get();
+    }
+
+    const std::vector<std::pair<uint64_t, uint64_t>>& GetAllRowGroupRanges() 
const {
+        return all_row_group_ranges_;
+    }
+
+    Result<std::shared_ptr<arrow::Schema>> GetSchema() const {
+        std::shared_ptr<arrow::Schema> file_schema;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&file_schema));
+        return file_schema;
+    }
+
+    Status Close() {
+        if (batch_reader_) {
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(batch_reader_->Close());
+        }
+        return Status::OK();
+    }
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GetRowGroupRanges(
+        const std::set<int32_t>& row_group_indices) const;
+
+    Status PrepareForReadingLazy(const std::set<int32_t>& row_group_indices,
+                                 const std::vector<int32_t>& column_indices);
+    Status PrepareForReading(const std::set<int32_t>& row_group_indices,
+                             const std::vector<int32_t>& column_indices);
+
+    Result<std::set<int32_t>> FilterRowGroupsByReadRanges(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+        const std::vector<int32_t>& src_row_groups) const;
+
+ private:
+    FileReaderWrapper(std::unique_ptr<::parquet::arrow::FileReader>&& 
file_reader,
+                      const std::vector<std::pair<uint64_t, uint64_t>>& 
all_row_group_ranges,
+                      uint64_t num_rows);
+
+    Result<std::set<int32_t>> ReadRangesToRowGroupIds(
+        const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const;
+    Result<int32_t> GetRowGroupId(std::pair<uint64_t, uint64_t> target_range) 
const;
+
+    std::unique_ptr<::parquet::arrow::FileReader> file_reader_;
+    std::unique_ptr<arrow::RecordBatchReader> batch_reader_;
+
+    std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges_;
+    std::set<int32_t> target_row_group_indices_;
+    std::vector<std::pair<uint64_t, uint64_t>> target_row_groups_;
+    std::vector<int32_t> target_column_indices_;
+
+    const uint64_t num_rows_;
+    uint64_t next_row_to_read_ = std::numeric_limits<uint64_t>::max();
+    uint64_t previous_first_row_ = std::numeric_limits<uint64_t>::max();
+    uint64_t current_row_group_idx_ = 0;
+    bool reader_initialized_ = false;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp 
b/src/paimon/format/parquet/file_reader_wrapper_test.cpp
new file mode 100644
index 0000000..12f90a1
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/file_reader_wrapper.h"
+
+#include <map>
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/caching.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/record_batch.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon::parquet::test {
+
+class FileReaderWrapperTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        fs_ = std::make_shared<LocalFileSystem>();
+        pool_ = GetDefaultPool();
+        arrow_pool_ = GetArrowPool(pool_);
+        batch_size_ = 512;
+    }
+    void TearDown() override {}
+
+    std::pair<std::shared_ptr<arrow::Schema>, 
std::shared_ptr<arrow::DataType>> PrepareArrowSchema()
+        const {
+        auto string_field = arrow::field(
+            "col1", arrow::utf8(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"0"}));
+        auto int_field = arrow::field(
+            "col2", arrow::int32(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"1"}));
+        auto bool_field = arrow::field(
+            "col3", arrow::boolean(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"2"}));
+        auto struct_type = arrow::struct_({string_field, int_field, 
bool_field});
+        return std::make_pair(
+            arrow::schema(arrow::FieldVector({string_field, int_field, 
bool_field})), struct_type);
+    }
+
+    std::shared_ptr<arrow::Array> PrepareArray(const 
std::shared_ptr<arrow::DataType>& data_type,
+                                               int32_t record_batch_size,
+                                               int32_t offset = 0) const {
+        arrow::StructBuilder struct_builder(
+            data_type, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int32Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto int_builder = 
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+            if (i % 3 == 0) {
+                // test null
+                EXPECT_TRUE(int_builder->AppendNull().ok());
+            } else {
+                EXPECT_TRUE(int_builder->Append(i).ok());
+            }
+            EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void AddRecordBatchOnce(const std::shared_ptr<ParquetFormatWriter>& 
format_writer,
+                            const std::shared_ptr<arrow::DataType>& 
struct_type,
+                            int32_t record_batch_size, int32_t offset) const {
+        auto array = PrepareArray(struct_type, record_batch_size, offset);
+        auto arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+        auto batch = std::make_shared<RecordBatch>(
+            /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+            /*row_kinds=*/std::vector<RecordBatch::RowKind>(), 
arrow_array.get());
+        ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+    }
+
+    Result<std::unique_ptr<FileReaderWrapper>> PrepareReaderWrapper(const 
std::string& file_path) {
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in, 
fs_->Open(file_path));
+        PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, in->Length());
+        auto input_stream = std::make_unique<ArrowInputStreamAdapter>(in, 
arrow_pool_, file_length);
+        ::parquet::arrow::FileReaderBuilder file_reader_builder;
+        ::parquet::ReaderProperties reader_properties;
+        reader_properties.enable_buffered_stream();
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(
+            file_reader_builder.Open(std::move(input_stream), 
reader_properties));
+
+        ::parquet::ArrowReaderProperties arrow_reader_props;
+        arrow_reader_props.set_pre_buffer(true);
+        arrow_reader_props.set_batch_size(static_cast<int64_t>(batch_size_));
+        arrow_reader_props.set_use_threads(true);
+        
arrow_reader_props.set_cache_options(arrow::io::CacheOptions::Defaults());
+        std::unique_ptr<::parquet::arrow::FileReader> file_reader;
+        
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(arrow_pool_.get())
+                                            ->properties(arrow_reader_props)
+                                            ->Build(&file_reader));
+        return FileReaderWrapper::Create(std::move(file_reader));
+    }
+
+    void PrepareParquetFile(const std::string& file_path, int32_t row_count) {
+        auto schema_pair = PrepareArrowSchema();
+        const auto& arrow_schema = schema_pair.first;
+        const auto& struct_type = schema_pair.second;
+
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs_->Create(file_path, /*overwrite=*/false));
+        ::parquet::WriterProperties::Builder builder;
+        builder.write_batch_size(10);
+        builder.max_row_group_length(1000);
+        builder.enable_store_decimal_as_integer();
+        auto writer_properties = builder.build();
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<ParquetFormatWriter> format_writer,
+            ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+                                        DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+
+        AddRecordBatchOnce(format_writer, struct_type, 
/*record_batch_size=*/row_count,
+                           /*offset=*/0);
+        ASSERT_OK(format_writer->Flush());
+        ASSERT_OK(format_writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+ private:
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+    int32_t batch_size_;
+};
+
+TEST_F(FileReaderWrapperTest, EmptyFile) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/0);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    ASSERT_EQ(0, reader_wrapper->GetNumberOfRows());
+    ASSERT_EQ(0, reader_wrapper->GetNumberOfRowGroups());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(), 
reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next());
+    ASSERT_EQ(0, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_EQ(0, reader_wrapper->GetNextRowToRead());
+    ASSERT_TRUE(reader_wrapper->GetAllRowGroupRanges().empty());
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::RecordBatch> record_batch, 
reader_wrapper->Next());
+    ASSERT_FALSE(record_batch);
+}
+
+TEST_F(FileReaderWrapperTest, NullFileReader) {
+    ASSERT_NOK_WITH_MSG(FileReaderWrapper::Create(nullptr),
+                        "file reader wrapper create failed. file reader is 
nullptr");
+}
+
+TEST_F(FileReaderWrapperTest, Simple) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/5500);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    ASSERT_EQ(5500, reader_wrapper->GetNumberOfRows());
+    ASSERT_EQ(6, reader_wrapper->GetNumberOfRowGroups());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(), 
reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    std::vector<std::pair<uint64_t, uint64_t>> expected_all_row_group_ranges = 
{
+        {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000}, 
{5000, 5500}};
+    ASSERT_EQ(expected_all_row_group_ranges, 
reader_wrapper->GetAllRowGroupRanges());
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::RecordBatch> record_batch, 
reader_wrapper->Next());
+    ASSERT_TRUE(record_batch);
+    ASSERT_EQ(512, record_batch->num_rows());
+    ASSERT_EQ(512, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(0, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_TRUE(record_batch);
+    ASSERT_EQ(488, record_batch->num_rows());
+    ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(512, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_TRUE(record_batch);
+    ASSERT_EQ(512, record_batch->num_rows());
+    ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_NOK_WITH_MSG(reader_wrapper->SeekToRow(1001),
+                        "should not be in the middle of readable range");
+    ASSERT_OK(reader_wrapper->SeekToRow(1000));
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_TRUE(record_batch);
+    ASSERT_EQ(512, record_batch->num_rows());
+    ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+
+    ASSERT_OK(reader_wrapper->SeekToRow(5600));
+    ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(6, reader_wrapper->current_row_group_idx_);
+    ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_FALSE(record_batch);
+    ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(5500, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+}
+
+TEST_F(FileReaderWrapperTest, GetRowGroupRanges) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/5500);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    ASSERT_OK_AND_ASSIGN(auto ranges, reader_wrapper->GetRowGroupRanges({0, 3, 
5}));
+    std::vector<std::pair<uint64_t, uint64_t>> expected_read_ranges = {
+        {0, 1000}, {3000, 4000}, {5000, 5500}};
+    ASSERT_EQ(expected_read_ranges, ranges);
+    ASSERT_NOK_WITH_MSG(reader_wrapper->GetRowGroupRanges({0, 3, 6}), "out of 
bound");
+    ASSERT_OK_AND_ASSIGN(ranges, reader_wrapper->GetRowGroupRanges({}));
+    ASSERT_TRUE(ranges.empty());
+}
+
+TEST_F(FileReaderWrapperTest, ReadRangesToRowGroupIds) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/5500);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    std::set<int32_t> expected_row_group_ids = {0, 3, 5};
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000}, {3000, 4000}, {5000, 5500}};
+    ASSERT_OK_AND_ASSIGN(auto row_group_ids, 
reader_wrapper->ReadRangesToRowGroupIds(read_ranges));
+    ASSERT_EQ(expected_row_group_ids, row_group_ids);
+    std::vector<std::pair<uint64_t, uint64_t>> invalid_ranges = {
+        {0, 1000}, {3000, 4000}, {5000, 5600}};
+    
ASSERT_NOK_WITH_MSG(reader_wrapper->ReadRangesToRowGroupIds(invalid_ranges),
+                        "not match with row group range bound");
+    ASSERT_OK_AND_ASSIGN(row_group_ids, 
reader_wrapper->ReadRangesToRowGroupIds({}));
+    ASSERT_TRUE(row_group_ids.empty());
+}
+
+TEST_F(FileReaderWrapperTest, FilterRowGroupsByReadRanges) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/5500);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    std::set<int32_t> expected_row_group_ids = {0, 5};
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+        {0, 1000}, {3000, 4000}, {5000, 5500}};
+    ASSERT_OK_AND_ASSIGN(auto row_group_ids,
+                         
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {0, 1, 2, 4, 5}));
+    ASSERT_EQ(expected_row_group_ids, row_group_ids);
+
+    ASSERT_OK_AND_ASSIGN(row_group_ids,
+                         
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {}));
+    ASSERT_TRUE(row_group_ids.empty());
+}
+
+TEST_F(FileReaderWrapperTest, PrepareForReading) {
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+    PrepareParquetFile(file_path, /*row_count=*/5500);
+    ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+    ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{1},
+                                                /*column_indices=*/{0}));
+    // seek before actual read range
+    ASSERT_OK(reader_wrapper->SeekToRow(0));
+    ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(auto record_batch, reader_wrapper->Next());
+    ASSERT_EQ(512, record_batch->num_rows());
+    ASSERT_EQ(1, record_batch->num_columns());
+    ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_TRUE(record_batch);
+    ASSERT_EQ(488, record_batch->num_rows());
+    ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(1512, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_FALSE(record_batch);
+
+    // empty column indices
+    ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{0, 1},
+                                                /*column_indices=*/{}));
+    ASSERT_EQ(0, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+    ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+    ASSERT_EQ(512, record_batch->num_rows());
+    ASSERT_EQ(0, record_batch->num_columns());
+
+    // empty row group indices
+    ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{},
+                                                /*column_indices=*/{0}));
+    ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+    ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+              reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+}
+
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp 
b/src/paimon/format/parquet/parquet_file_batch_reader.cpp
new file mode 100644
index 0000000..d7a1cde
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+
+#include <cstddef>
+#include <unordered_map>
+
+#include "arrow/acero/options.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/compute/api.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/io/caching.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/util/range.h"
+#include "arrow/util/thread_pool.h"
+#include "fmt/format.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+#include "paimon/format/parquet/predicate_converter.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+}  // namespace arrow
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+ParquetFileBatchReader::ParquetFileBatchReader(
+    std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+    std::unique_ptr<FileReaderWrapper>&& reader, const std::map<std::string, 
std::string>& options,
+    const std::shared_ptr<arrow::MemoryPool>& arrow_pool)
+    : options_(options),
+      arrow_pool_(arrow_pool),
+      input_stream_(std::move(input_stream)),
+      reader_(std::move(reader)),
+      read_ranges_(reader_->GetAllRowGroupRanges()),
+      metrics_(std::make_shared<MetricsImpl>()) {}
+
+Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
+    std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+    const std::shared_ptr<arrow::MemoryPool>& pool,
+    const std::map<std::string, std::string>& options, int32_t batch_size) {
+    assert(input_stream);
+    PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties,
+                           CreateReaderProperties(pool, options));
+    PAIMON_ASSIGN_OR_RAISE(::parquet::ArrowReaderProperties 
arrow_reader_properties,
+                           CreateArrowReaderProperties(pool, options, 
batch_size));
+
+    ::parquet::arrow::FileReaderBuilder file_reader_builder;
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream, 
reader_properties));
+
+    std::unique_ptr<::parquet::arrow::FileReader> file_reader;
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get())
+                                        ->properties(arrow_reader_properties)
+                                        ->Build(&file_reader));
+
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileReaderWrapper> reader,
+                           FileReaderWrapper::Create(std::move(file_reader)));
+    auto parquet_file_batch_reader = std::unique_ptr<ParquetFileBatchReader>(
+        new ParquetFileBatchReader(std::move(input_stream), std::move(reader), 
options, pool));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> file_schema,
+                           parquet_file_batch_reader->GetFileSchema());
+    PAIMON_RETURN_NOT_OK(parquet_file_batch_reader->SetReadSchema(
+        file_schema.get(), /*predicate=*/nullptr, 
/*selection_bitmap=*/std::nullopt));
+    return parquet_file_batch_reader;
+}
+
+Result<std::unique_ptr<::ArrowSchema>> ParquetFileBatchReader::GetFileSchema() 
const {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema, 
reader_->GetSchema());
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> new_schema,
+                           
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(file_schema));
+    PAIMON_ASSIGN_OR_RAISE(
+        std::shared_ptr<arrow::DataType> new_type,
+        
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(new_schema->fields())));
+
+    auto c_schema = std::make_unique<::ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*new_type, 
c_schema.get()));
+    return c_schema;
+}
+
+Status ParquetFileBatchReader::SetReadSchema(
+    ::ArrowSchema* schema, const std::shared_ptr<Predicate>& predicate,
+    const std::optional<RoaringBitmap32>& selection_bitmap) {
+    if (!schema) {
+        return Status::Invalid("SetReadSchema failed: read schema cannot be 
nullptr");
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
read_schema,
+                                      arrow::ImportSchema(schema));
+
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema, 
reader_->GetSchema());
+    std::unordered_map<std::string, std::vector<int32_t>> field_index_map;
+    int32_t i = 0;
+    for (const auto& field : file_schema->fields()) {
+        std::vector<int32_t> v;
+        FlattenSchema(field->type(), &i, &v);
+        field_index_map[field->name()] = v;
+    }
+
+    std::vector<int32_t> column_indices;
+    for (const auto& field : read_schema->field_names()) {
+        if (field_index_map.find(field) != field_index_map.end()) {
+            for (int32_t index : field_index_map[field]) {
+                column_indices.push_back(index);
+            }
+        } else {
+            return Status::Invalid(fmt::format("Field {} is not found in 
schema.", field));
+        }
+    }
+
+    std::vector<int32_t> row_groups = 
arrow::internal::Iota(reader_->GetNumberOfRowGroups());
+    if (predicate) {
+        PAIMON_ASSIGN_OR_RAISE(row_groups,
+                               FilterRowGroupsByPredicate(predicate, 
file_schema, row_groups));
+    }
+    if (selection_bitmap) {
+        PAIMON_ASSIGN_OR_RAISE(row_groups,
+                               
FilterRowGroupsByBitmap(selection_bitmap.value(), row_groups));
+    }
+
+    read_data_type_ = arrow::struct_(read_schema->fields());
+    read_row_groups_ = row_groups;
+    read_column_indices_ = column_indices;
+
+    metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL, 
reader_->GetNumberOfRowGroups());
+    metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER, 
row_groups.size());
+
+    PAIMON_ASSIGN_OR_RAISE(std::set<int32_t> ordered_row_groups,
+                           reader_->FilterRowGroupsByReadRanges(read_ranges_, 
read_row_groups_));
+    return reader_->PrepareForReadingLazy(ordered_row_groups, 
read_column_indices_);
+}
+
+Result<std::vector<int32_t>> 
ParquetFileBatchReader::FilterRowGroupsByPredicate(
+    const std::shared_ptr<Predicate>& predicate, const 
std::shared_ptr<arrow::Schema> file_schema,
+    const std::vector<int32_t>& src_row_groups) const {
+    if (!predicate) {
+        return Status::Invalid("cannot pushdown an empty predicate");
+    }
+    // convert paimon predicate to arrow expression
+    PAIMON_ASSIGN_OR_RAISE(
+        uint32_t predicate_node_count_limit,
+        OptionsUtils::GetValueFromMap<uint32_t>(options_, 
PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT,
+                                                
DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT));
+    PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression expr,
+                           PredicateConverter::Convert(predicate, 
predicate_node_count_limit));
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::Expression bind_expr, 
expr.Bind(*file_schema));
+
+    // prepare file source
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t file_length, 
input_stream_->GetSize());
+    auto file_source = arrow::dataset::FileSource(input_stream_, 
/*size=*/file_length);
+
+    // filter row group by arrow expression and row group meta
+    auto parquet_file_format = 
std::make_shared<arrow::dataset::ParquetFileFormat>();
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        std::shared_ptr<arrow::dataset::ParquetFileFragment> file_fragment,
+        parquet_file_format->MakeFragment(
+            file_source, 
/*partition_expression=*/PredicateConverter::AlwaysTrue(),
+            /*physical_schema=*/nullptr, /*row_groups=*/src_row_groups));
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(
+        file_fragment->EnsureCompleteMetadata(reader_->GetFileReader()));
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::dataset::FragmentVector 
target_fragments,
+                                      
file_fragment->SplitByRowGroup(bind_expr));
+    std::vector<int32_t> target_row_groups;
+    target_row_groups.reserve(src_row_groups.size());
+    for (const auto& fragment : target_fragments) {
+        auto parquet_fragment = 
dynamic_cast<arrow::dataset::ParquetFileFragment*>(fragment.get());
+        if (!parquet_fragment) {
+            return Status::Invalid("cannot cast to ParquetFileFragment in 
ParquetFileBatchReader");
+        }
+        target_row_groups.insert(target_row_groups.end(), 
parquet_fragment->row_groups().begin(),
+                                 parquet_fragment->row_groups().end());
+    }
+    return target_row_groups;
+}
+
+Result<std::vector<int32_t>> ParquetFileBatchReader::FilterRowGroupsByBitmap(
+    const RoaringBitmap32& bitmap, const std::vector<int32_t>& src_row_groups) 
const {
+    if (bitmap.IsEmpty()) {
+        return Status::Invalid("cannot push down an empty bitmap to 
ParquetFileBatchReader");
+    }
+    const auto& all_row_group_ranges = reader_->GetAllRowGroupRanges();
+    // filter row groups by row range
+    std::vector<int32_t> target_row_groups;
+    for (const auto& row_group_idx : src_row_groups) {
+        if (static_cast<size_t>(row_group_idx) >= all_row_group_ranges.size()) 
{
+            return Status::Invalid(
+                fmt::format("src row group {} not in row group meta", 
row_group_idx));
+        }
+        const auto& [start_row_idx, end_row_idx] = 
all_row_group_ranges[row_group_idx];
+        if (bitmap.ContainsAny(start_row_idx, end_row_idx)) {
+            target_row_groups.push_back(row_group_idx);
+        }
+    }
+    return target_row_groups;
+}
+
+Result<BatchReader::ReadBatch> ParquetFileBatchReader::NextBatch() {
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch, 
reader_->Next());
+    if (batch == nullptr) {
+        return BatchReader::MakeEofBatch();
+    }
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array, 
batch->ToStructArray());
+    PAIMON_ASSIGN_OR_RAISE(bool need_cast, 
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                               array->type(), 
read_data_type_));
+    if (need_cast) {
+        PAIMON_ASSIGN_OR_RAISE(array, 
ParquetTimestampConverter::CastArrayForTimestamp(
+                                          array, read_data_type_, 
arrow_pool_));
+    }
+    PAIMON_ASSIGN_OR_RAISE(need_cast, 
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+                                          array->type(), read_data_type_));
+    if (need_cast) {
+        return Status::Invalid(
+            fmt::format("unexpected: in parquet, after CastArrayForTimestamp, 
output type {} not "
+                        "equal with read schema {}",
+                        array->type()->ToString(), 
read_data_type_->ToString()));
+    }
+    std::unique_ptr<ArrowArray> c_array = std::make_unique<ArrowArray>();
+    std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), 
c_schema.get()));
+
+    read_rows_ += array->length();
+    read_batch_count_++;
+    metrics_->SetCounter(ParquetMetrics::READ_ROWS, read_rows_);
+    metrics_->SetCounter(ParquetMetrics::READ_BATCH_COUNT, read_batch_count_);
+
+    return make_pair(std::move(c_array), std::move(c_schema));
+}
+
+Result<std::vector<std::pair<uint64_t, uint64_t>>> 
ParquetFileBatchReader::GenReadRanges(
+    bool* need_prefetch) const {
+    *need_prefetch = true;
+    return reader_->GetAllRowGroupRanges();
+}
+
+Result<::parquet::ReaderProperties> 
ParquetFileBatchReader::CreateReaderProperties(
+    const std::shared_ptr<arrow::MemoryPool>& pool,
+    const std::map<std::string, std::string>& options) {
+    ::parquet::ReaderProperties reader_properties;
+    // TODO(jinli.zjw): set more ReaderProperties (compare with java)
+    PAIMON_ASSIGN_OR_RAISE(
+        bool enable_pre_buffer,
+        OptionsUtils::GetValueFromMap<bool>(options, 
PARQUET_READ_ENABLE_PRE_BUFFER, true));
+    if (enable_pre_buffer) {
+        reader_properties.enable_buffered_stream();
+    } else {
+        reader_properties.disable_buffered_stream();
+    }
+    return reader_properties;
+}
+
+Result<::parquet::ArrowReaderProperties> 
ParquetFileBatchReader::CreateArrowReaderProperties(
+    const std::shared_ptr<arrow::MemoryPool>& pool,
+    const std::map<std::string, std::string>& options, int32_t batch_size) {
+    PAIMON_ASSIGN_OR_RAISE(
+        uint32_t executor_thread_count,
+        OptionsUtils::GetValueFromMap<uint32_t>(options, 
PARQUET_READ_EXECUTOR_THREAD_COUNT,
+                                                
DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT));
+
+    ::parquet::ArrowReaderProperties arrow_reader_props;
+    // TODO(jinli.zjw): set more ArrowReaderProperties (compare with java)
+    PAIMON_ASSIGN_OR_RAISE(
+        bool enable_pre_buffer,
+        OptionsUtils::GetValueFromMap<bool>(options, 
PARQUET_READ_ENABLE_PRE_BUFFER, true));
+    arrow_reader_props.set_pre_buffer(enable_pre_buffer);
+    arrow_reader_props.set_batch_size(static_cast<int64_t>(batch_size));
+    if (executor_thread_count != 0) {
+        
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::SetCpuThreadPoolCapacity(executor_thread_count));
+        arrow_reader_props.set_use_threads(true);
+    } else {
+        arrow_reader_props.set_use_threads(false);
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool cache_lazy, 
OptionsUtils::GetValueFromMap<bool>(
+                                                options, 
PARQUET_READ_CACHE_OPTION_LAZY, false));
+    PAIMON_ASSIGN_OR_RAISE(
+        int64_t cache_prefetch_limit,
+        OptionsUtils::GetValueFromMap<int64_t>(options, 
PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT,
+                                               
DEFAULT_PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT));
+    PAIMON_ASSIGN_OR_RAISE(
+        int64_t cache_range_size_limit,
+        OptionsUtils::GetValueFromMap<int64_t>(options, 
PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT,
+                                               
DEFAULT_PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT));
+    auto cache_option = arrow::io::CacheOptions::Defaults();
+    cache_option.lazy = cache_lazy;
+    cache_option.prefetch_limit = cache_prefetch_limit;
+    cache_option.range_size_limit = cache_range_size_limit;
+    arrow_reader_props.set_cache_options(cache_option);
+    return arrow_reader_props;
+}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h 
b/src/paimon/format/parquet/parquet_file_batch_reader.h
new file mode 100644
index 0000000..806afbe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader.h
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/parquet/file_reader_wrapper.h"
+#include "paimon/reader/prefetch_file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+
+namespace io {
+class RandomAccessFile;
+}  // namespace io
+}  // namespace arrow
+namespace paimon {
+class Metrics;
+class Predicate;
+class RoaringBitmap32;
+}  // namespace paimon
+
+namespace paimon::parquet {
+
+class ParquetFileBatchReader : public PrefetchFileBatchReader {
+ public:
+    static Result<std::unique_ptr<ParquetFileBatchReader>> Create(
+        std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+        const std::shared_ptr<arrow::MemoryPool>& pool,
+        const std::map<std::string, std::string>& options, int32_t batch_size);
+
+    // For timestamp type, we return the schema stored in file, e.g., second 
in parquet file will
+    // store as milli.
+    Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
+
+    Status SetReadSchema(::ArrowSchema* read_schema, const 
std::shared_ptr<Predicate>& predicate,
+                         const std::optional<RoaringBitmap32>& 
selection_bitmap) override;
+
+    Status SeekToRow(uint64_t row_number) override {
+        assert(reader_);
+        return reader_->SeekToRow(row_number);
+    }
+
+    // Important: output ArrowArray is allocated on arrow_pool_ whose 
lifecycle holds in
+    // ParquetFileBatchReader. Therefore, we need to hold BatchReader when 
using output
+    // ArrowArray.
+    Result<ReadBatch> NextBatch() override;
+
+    Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+        bool* need_prefetch) const override;
+
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+        assert(reader_);
+        return reader_->GetPreviousBatchFirstRowNumber();
+    }
+
+    Result<uint64_t> GetNumberOfRows() const override {
+        assert(reader_);
+        return reader_->GetNumberOfRows();
+    }
+
+    uint64_t GetNextRowToRead() const override {
+        assert(reader_);
+        return reader_->GetNextRowToRead();
+    }
+
+    Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& 
read_ranges) override {
+        read_ranges_ = read_ranges;
+        PAIMON_ASSIGN_OR_RAISE(
+            std::set<int32_t> ordered_row_groups,
+            reader_->FilterRowGroupsByReadRanges(read_ranges_, 
read_row_groups_));
+        return reader_->PrepareForReadingLazy(ordered_row_groups, 
read_column_indices_);
+    }
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override {
+        return metrics_;
+    }
+
+    void Close() override {
+        if (reader_) {
+            auto status = reader_->Close();
+            reader_.reset();
+            (void)status;
+        }
+        input_stream_.reset();
+    }
+
+    bool SupportPreciseBitmapSelection() const override {
+        return false;
+    }
+
+ private:
+    ParquetFileBatchReader(std::shared_ptr<arrow::io::RandomAccessFile>&& 
input_stream,
+                           std::unique_ptr<FileReaderWrapper>&& reader,
+                           const std::map<std::string, std::string>& options,
+                           const std::shared_ptr<arrow::MemoryPool>& 
arrow_pool);
+
+    static Result<::parquet::ReaderProperties> CreateReaderProperties(
+        const std::shared_ptr<arrow::MemoryPool>& pool,
+        const std::map<std::string, std::string>& options);
+
+    static Result<::parquet::ArrowReaderProperties> 
CreateArrowReaderProperties(
+        const std::shared_ptr<arrow::MemoryPool>& pool,
+        const std::map<std::string, std::string>& options, int32_t batch_size);
+
+    static void FlattenSchema(const std::shared_ptr<arrow::DataType>& type, 
int32_t* index,
+                              std::vector<int32_t>* index_vector) {
+        if (type->id() == arrow::Type::STRUCT || type->id() == 
arrow::Type::LIST ||
+            type->id() == arrow::Type::MAP) {
+            for (int32_t i = 0; i < type->num_fields(); i++) {
+                auto field = type->field(i);
+                auto inner_type = field->type();
+                FlattenSchema(inner_type, index, index_vector);
+            }
+        } else {
+            index_vector->push_back((*index)++);
+        }
+    }
+
+    // precondition: predicate supposed not be empty
+    Result<std::vector<int32_t>> FilterRowGroupsByPredicate(
+        const std::shared_ptr<Predicate>& predicate,
+        const std::shared_ptr<arrow::Schema> file_schema,
+        const std::vector<int32_t>& src_row_groups) const;
+
+    Result<std::vector<int32_t>> FilterRowGroupsByBitmap(
+        const RoaringBitmap32& bitmap, const std::vector<int32_t>& 
src_row_groups) const;
+
+ private:
+    std::map<std::string, std::string> options_;
+    // hold the lifecycle of arrow memory pool.
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+
+    std::shared_ptr<arrow::io::RandomAccessFile> input_stream_;
+    std::unique_ptr<FileReaderWrapper> reader_;
+
+    std::shared_ptr<arrow::DataType> read_data_type_;
+    std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+
+    std::shared_ptr<Metrics> metrics_;
+
+    uint64_t read_rows_ = 0;
+    uint64_t read_batch_count_ = 0;
+
+    // last time set read schema
+    std::vector<int32_t> read_row_groups_;
+    std::vector<int32_t> read_column_indices_;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp 
b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
new file mode 100644
index 0000000..836c291
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+
+#include <iostream>
+#include <limits>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/caching.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/testing/utils/timezone_guard.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+}  // namespace paimon
+
+namespace paimon::parquet::test {
+
+class ParquetFileBatchReaderTest : public ::testing::Test,
+                                   public ::testing::WithParamInterface<bool> {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        fs_ = std::make_shared<LocalFileSystem>();
+        pool_ = GetArrowPool(GetDefaultPool());
+        batch_size_ = 10;
+        file_path_ = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+
+        arrow::FieldVector fields = {
+            arrow::field("f1", arrow::boolean()),
+            arrow::field("f2", arrow::int8()),
+            arrow::field("f3", arrow::int16()),
+            arrow::field("f4", arrow::int32()),
+            arrow::field("f5", arrow::int64()),
+            arrow::field("f6", arrow::float32()),
+            arrow::field("f7", arrow::float64()),
+            arrow::field("f8", arrow::utf8()),
+            arrow::field("f9", arrow::binary()),
+            arrow::field("f10", arrow::map(arrow::list(arrow::float32()),
+                                           arrow::struct_({arrow::field("f0", 
arrow::boolean()),
+                                                           arrow::field("f1", 
arrow::int64())}))),
+            arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+            arrow::field("f12", arrow::date32()),
+            arrow::field("f13", arrow::decimal128(2, 2))};
+
+        schema_ = arrow::schema(fields);
+        struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+            arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), 
R"([
+[true, 31, null, 300001, 3000000001, 3.1, 300.000000001, "s31", "a31", 
[[[5.11, 5.21], [true, 61]]], "1970-01-01 00:00:00.000071", 81, "0.91"],
+[false, 32, 302, 300002, 3000000002, 3.2, 300.000000002, "s32", "a32", 
[[[5.12, 5.22], [false, 62]]], "1970-01-01 00:00:00.000072", 82, "0.92"],
+[true, 33, 303, 300003, 3000000003, 3.3, 300.000000003, "s33", "a33", null, 
"1970-01-01 00:00:00.000073", 83, "0.93"],
+[false, 34, 304, 300004, 3000000004, 3.4, 300.000000004, "s34", "a34", 
[[[5.141, 5.241], [false, 641]], [[5.14, 5.24], [false, 64]]], "1970-01-01 
00:00:00.000074", 84, "0.94"],
+[true, 35, 305, 300005, 3000000005, 3.5, 300.000000005, "s35", "a35", [[[5.15, 
5.25], [true, 65]]], "1970-01-01 00:00:00.000075", 85, "0.95"],
+[false, 36, 306, 300006, 3000000006, 3.6, 300.000000006, "s36", "a36", 
[[[5.16, 5.26], null]], "1970-01-01 00:00:00.000076", 86, "0.96"]
+    ])")
+                .ValueOrDie());
+    }
+
+    void TearDown() override {}
+
+    void WriteArray(const std::string& file_path, const 
std::shared_ptr<arrow::Array>& src_array,
+                    const std::shared_ptr<arrow::Schema>& arrow_schema, 
int64_t write_batch_size,
+                    bool enable_dictionary, int64_t max_row_group_length) 
const {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs_->Create(file_path, /*overwrite=*/true));
+        ::parquet::WriterProperties::Builder builder;
+        builder.write_batch_size(write_batch_size);
+        builder.max_row_group_length(max_row_group_length);
+        enable_dictionary ? builder.enable_dictionary() : 
builder.disable_dictionary();
+        auto writer_properties = builder.build();
+        ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create(
+                                                     out, arrow_schema, 
writer_properties,
+                                                     
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool_));
+
+        auto arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*src_array, arrow_array.get()).ok());
+        ASSERT_OK(format_writer->AddBatch(arrow_array.get()));
+        ASSERT_OK(format_writer->Flush());
+        ASSERT_OK(format_writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+    }
+
+    std::unique_ptr<ParquetFileBatchReader> PrepareParquetFileBatchReader(
+        const std::string& file_name, const std::shared_ptr<arrow::Schema>& 
read_schema,
+        const std::shared_ptr<Predicate>& predicate,
+        const std::optional<RoaringBitmap32>& selection_bitmap, int32_t 
batch_size) const {
+        EXPECT_OK_AND_ASSIGN(auto input_stream, fs_->Open(file_name));
+        auto length = fs_->GetFileStatus(file_name).value()->GetLen();
+        auto in_stream =
+            std::make_unique<ArrowInputStreamAdapter>(std::move(input_stream), 
pool_, length);
+        std::map<std::string, std::string> options = {};
+        return PrepareParquetFileBatchReader(std::move(in_stream), options, 
read_schema, predicate,
+                                             selection_bitmap, batch_size);
+    }
+
+    std::unique_ptr<paimon::parquet::ParquetFileBatchReader> 
PrepareParquetFileBatchReader(
+        std::unique_ptr<arrow::io::RandomAccessFile>&& in_stream,
+        const std::map<std::string, std::string>& options,
+        const std::shared_ptr<arrow::Schema>& read_schema,
+        const std::shared_ptr<Predicate>& predicate,
+        const std::optional<RoaringBitmap32>& selection_bitmap, int32_t 
batch_size) const {
+        EXPECT_OK_AND_ASSIGN(
+            auto parquet_batch_reader,
+            ParquetFileBatchReader::Create(std::move(in_stream), pool_, 
options, batch_size));
+        std::unique_ptr<ArrowSchema> c_schema = 
std::make_unique<ArrowSchema>();
+        auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+        EXPECT_TRUE(arrow_status.ok());
+        EXPECT_OK(parquet_batch_reader->SetReadSchema(c_schema.get(), 
predicate, selection_bitmap));
+        return parquet_batch_reader;
+    }
+
+ private:
+    std::string file_path_;
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<arrow::MemoryPool> pool_;
+    int32_t batch_size_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::shared_ptr<arrow::StructArray> struct_array_;
+};
+
+TEST_F(ParquetFileBatchReaderTest, TestSimple) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"/parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    auto parquet_batch_reader = PrepareParquetFileBatchReader(
+        file_name, schema_, /*predicate=*/nullptr, 
/*selection_bitmap=*/std::nullopt, batch_size_);
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<arrow::ChunkedArray> result_array,
+        
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+    std::shared_ptr<arrow::ChunkedArray> expected_array =
+        std::make_shared<arrow::ChunkedArray>(struct_array_);
+    ASSERT_TRUE(result_array->Equals(*expected_array,
+                                     
arrow::EqualOptions::Defaults().diff_sink(&std::cout)));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, 
fs_->Open(file_name));
+    auto length = fs_->GetFileStatus(file_name).value()->GetLen();
+    auto in_stream =
+        std::make_unique<ArrowInputStreamAdapter>(std::move(input_stream), 
pool_, length);
+    std::map<std::string, std::string> options;
+    ASSERT_OK_AND_ASSIGN(
+        auto parquet_batch_reader,
+        ParquetFileBatchReader::Create(std::move(in_stream), pool_, options, 
batch_size_));
+    // test GetFileSchema()
+    ASSERT_OK_AND_ASSIGN(auto c_file_schema, 
parquet_batch_reader->GetFileSchema());
+    auto arrow_file_schema = 
arrow::ImportSchema(c_file_schema.get()).ValueOrDie();
+    ASSERT_TRUE(arrow_file_schema->Equals(*schema_));
+
+    // NextBatch() without SetReadSchema(), will return data with
+    // file schema
+    ASSERT_OK_AND_ASSIGN(
+        auto result_with_file_schema,
+        
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+    auto expected_file_chunk_array = 
std::make_shared<arrow::ChunkedArray>(struct_array_);
+    ASSERT_TRUE(result_with_file_schema->Equals(expected_file_chunk_array));
+    // NextBatch() with SetReadSchema(), will return data with read schema
+    arrow::Schema read_schema({schema_->field(0), schema_->field(9), 
schema_->field(12)});
+    std::unique_ptr<ArrowSchema> c_read_schema = 
std::make_unique<ArrowSchema>();
+    ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok());
+    ASSERT_OK(parquet_batch_reader->SetReadSchema(c_read_schema.get(),
+                                                  /*predicate=*/nullptr,
+                                                  
/*selection_bitmap=*/std::nullopt));
+    auto expected_read_array =
+        arrow::StructArray::Make(
+            {struct_array_->field(0), struct_array_->field(9), 
struct_array_->field(12)},
+            read_schema.fields())
+            .ValueOrDie();
+    auto expected_read_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+    ASSERT_OK_AND_ASSIGN(
+        auto result_with_read_schema,
+        
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+    ASSERT_TRUE(result_with_read_schema->Equals(expected_read_chunk_array));
+
+    // NextBatch() with predicate
+    auto predicate = PredicateBuilder::IsNull(
+        /*field_index=*/0, /*field_name=*/"f1", FieldType::BOOLEAN);
+    c_read_schema = std::make_unique<ArrowSchema>();
+    ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok());
+    ASSERT_OK(parquet_batch_reader->SetReadSchema(c_read_schema.get(), 
predicate,
+                                                  
/*selection_bitmap=*/std::nullopt));
+    ASSERT_OK_AND_ASSIGN(result_with_read_schema, 
paimon::test::ReadResultCollector::CollectResult(
+                                                      
parquet_batch_reader.get()));
+    ASSERT_FALSE(result_with_read_schema);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchSimple) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    for (auto batch_size : {1, 2, 3, 5, 8, 10}) {
+        auto parquet_batch_reader =
+            PrepareParquetFileBatchReader(file_name, schema_, 
/*predicate=*/nullptr,
+                                          /*selection_bitmap=*/std::nullopt, 
batch_size);
+        
ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
+                  std::numeric_limits<uint64_t>::max());
+        ASSERT_OK_AND_ASSIGN(auto result_array, 
paimon::test::ReadResultCollector::CollectResult(
+                                                    
parquet_batch_reader.get()));
+        
ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6);
+        parquet_batch_reader->Close();
+        auto expected_array = 
std::make_shared<arrow::ChunkedArray>(struct_array_);
+        ASSERT_TRUE(result_array->Equals(expected_array));
+        // test metrics
+        auto read_metrics = parquet_batch_reader->GetReaderMetrics();
+        ASSERT_TRUE(read_metrics);
+        // TODO(jinli.zjw): test metrics
+        // ASSERT_TRUE(read_metrics->GetCounter(ParquetMetrics::READ_BYTES) > 
0);
+        // 
ASSERT_TRUE(read_metrics->GetCounter(ParquetMetrics::READ_RAW_BYTES) > 0);
+    }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithTargetSchema) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    auto read_schema = arrow::schema(arrow::FieldVector(
+        {schema_->field(4), schema_->field(9), schema_->field(10), 
schema_->field(12)}));
+    auto parquet_batch_reader =
+        PrepareParquetFileBatchReader(file_name, read_schema, 
/*predicate=*/nullptr,
+                                      /*selection_bitmap=*/std::nullopt, 
batch_size_);
+    ASSERT_OK_AND_ASSIGN(auto result_array, 
paimon::test::ReadResultCollector::CollectResult(
+                                                parquet_batch_reader.get()));
+    parquet_batch_reader->Close();
+    auto expected_read_array =
+        arrow::StructArray::Make({struct_array_->field(4), 
struct_array_->field(9),
+                                  struct_array_->field(10), 
struct_array_->field(12)},
+                                 read_schema->fields())
+            .ValueOrDie();
+    auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+    ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithOutofOrderTargetSchema) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    // read with f13, f11, f10, f5
+    auto read_schema = arrow::schema(arrow::FieldVector(
+        {schema_->field(12), schema_->field(10), schema_->field(9), 
schema_->field(4)}));
+    auto parquet_batch_reader =
+        PrepareParquetFileBatchReader(file_name, read_schema, 
/*predicate=*/nullptr,
+                                      /*selection_bitmap=*/std::nullopt, 
batch_size_);
+    ASSERT_OK_AND_ASSIGN(auto result_array, 
paimon::test::ReadResultCollector::CollectResult(
+                                                parquet_batch_reader.get()));
+    parquet_batch_reader->Close();
+    auto expected_read_array =
+        arrow::StructArray::Make({struct_array_->field(12), 
struct_array_->field(10),
+                                  struct_array_->field(9), 
struct_array_->field(4)},
+                                 read_schema->fields())
+            .ValueOrDie();
+    auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+    ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithDictionary) {
+    auto f0 = arrow::field("f0", arrow::list(arrow::utf8()));
+    auto f1 = arrow::field("f1", arrow::map(arrow::utf8(), arrow::binary()));
+    auto f2 = arrow::field(
+        "f2", arrow::struct_({field("sub1", arrow::int64()), field("sub2", 
arrow::binary()),
+                              field("sub3", arrow::utf8())}));
+
+    arrow::FieldVector fields = {f0, f1, f2};
+    auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+        [["a", "a", "b"], [["a", "q"], ["b", "w"]],             [10, "q", 
"a"]],
+        [["a", "c"],      [["a", "e"], ["b", "r"], ["c", "e"]], [20, "w", 
"a"]],
+        [["a", "d"],      [["d", "r"], ["e", "t"]],             [null, "e", 
"b"]],
+        [["a"],           [["a", "q"]],                         [null, "w", 
"c"]],
+        [null,            [["a", "w"], ["f", "y"]],             [50, "r", 
null]],
+        [["a"],           null,                                 [60, "r", 
"b"]],
+        [["a", "b", "e"], [["a", null], ["b", "w"]],            null]
+    ])")
+            .ValueOrDie());
+    auto src_schema = arrow::schema(fields);
+    auto arrow_schema = arrow::schema(fields);
+    auto expected_array = arrow::ChunkedArray::Make({src_array}).ValueOrDie();
+
+    auto check_result = [&](bool enable_dictionary) {
+        WriteArray(file_path_, src_array, arrow_schema, 
/*write_batch_size=*/1, enable_dictionary,
+                   /*max_row_group_length=*/3);
+        auto parquet_batch_reader =
+            PrepareParquetFileBatchReader(file_path_, arrow_schema, 
/*predicate=*/nullptr,
+                                          /*selection_bitmap=*/std::nullopt, 
/*batch_size=*/2);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::ChunkedArray> result_array,
+            
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+        ASSERT_TRUE(result_array->Equals(expected_array)) << 
result_array->ToString();
+    };
+    check_result(true);
+    check_result(false);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestGetFileSchemaWithFieldId) {
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    auto parquet_batch_reader = PrepareParquetFileBatchReader(
+        file_name, schema_, /*predicate=*/nullptr, 
/*selection_bitmap=*/std::nullopt, batch_size_);
+    ASSERT_OK_AND_ASSIGN(auto c_file_schema, 
parquet_batch_reader->GetFileSchema());
+    auto arrow_file_schema = 
arrow::ImportSchema(c_file_schema.get()).ValueOr(nullptr);
+    ASSERT_TRUE(arrow_file_schema);
+    ASSERT_OK_AND_ASSIGN(auto data_fields,
+                         
DataField::ConvertArrowSchemaToDataFields(arrow_file_schema));
+
+    auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField(
+        DataField(536880129, arrow::field("item", arrow::float32()))));
+    std::vector<DataField> struct_fields = {DataField(10, arrow::field("f0", 
arrow::boolean())),
+                                            DataField(11, arrow::field("f1", 
arrow::int64()))};
+    auto struct_type = 
DataField::ConvertDataFieldsToArrowStructType(struct_fields);
+    auto map_type = arrow::map(list_type, struct_type);
+
+    std::vector<DataField> expected_data_fields = {
+        DataField(0, arrow::field("f1", arrow::boolean())),
+        DataField(1, arrow::field("f2", arrow::int8())),
+        DataField(2, arrow::field("f3", arrow::int16())),
+        DataField(3, arrow::field("f4", arrow::int32())),
+        DataField(4, arrow::field("f5", arrow::int64())),
+        DataField(5, arrow::field("f6", arrow::float32())),
+        DataField(6, arrow::field("f7", arrow::float64())),
+        DataField(7, arrow::field("f8", arrow::utf8())),
+        DataField(8, arrow::field("f9", arrow::binary())),
+        DataField(9, arrow::field("f10", map_type)),
+        DataField(12, arrow::field("f11", 
arrow::timestamp(arrow::TimeUnit::NANO))),
+        DataField(13, arrow::field("f12", arrow::date32())),
+        DataField(14, arrow::field("f13", arrow::decimal128(2, 2))),
+    };
+    ASSERT_EQ(data_fields, expected_data_fields);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestCreateReaderProperties) {
+    {
+        // test default options
+        std::map<std::string, std::string> options;
+        ASSERT_OK_AND_ASSIGN(auto reader_properties,
+                             
ParquetFileBatchReader::CreateReaderProperties(pool_, options));
+        ASSERT_EQ(reader_properties.is_buffered_stream_enabled(), true);
+    }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestCreateArrowReaderProperties) {
+    {
+        // test default options
+        std::map<std::string, std::string> options;
+        int32_t batch_size = 1024;
+        ASSERT_OK_AND_ASSIGN(
+            auto arrow_reader_properties,
+            ParquetFileBatchReader::CreateArrowReaderProperties(pool_, 
options, batch_size));
+        ASSERT_EQ(arrow_reader_properties.pre_buffer(), true);
+        ASSERT_EQ(arrow_reader_properties.batch_size(), 1024);
+        ASSERT_EQ(arrow_reader_properties.use_threads(), true);
+        ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 3);
+        ASSERT_EQ(arrow_reader_properties.cache_options(), 
arrow::io::CacheOptions::Defaults());
+    }
+    {
+        std::map<std::string, std::string> options = 
{{PARQUET_READ_EXECUTOR_THREAD_COUNT, "0"}};
+        int32_t batch_size = 1024;
+        ASSERT_OK_AND_ASSIGN(
+            auto arrow_reader_properties,
+            ParquetFileBatchReader::CreateArrowReaderProperties(pool_, 
options, batch_size));
+        ASSERT_EQ(arrow_reader_properties.use_threads(), false);
+    }
+    {
+        std::map<std::string, std::string> options = 
{{PARQUET_READ_EXECUTOR_THREAD_COUNT, "6"}};
+        int32_t batch_size = 1024;
+        ASSERT_OK_AND_ASSIGN(
+            auto arrow_reader_properties,
+            ParquetFileBatchReader::CreateArrowReaderProperties(pool_, 
options, batch_size));
+        ASSERT_EQ(arrow_reader_properties.use_threads(), true);
+        ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 6);
+    }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestBitmapPushDownWithMultiRowGroups) {
+    arrow::FieldVector fields = {arrow::field("f0", arrow::int32())};
+    auto arrow_type = arrow::struct_(fields);
+    auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([
+        [0],
+        [1],
+        [2],
+        [3],
+        [4],
+        [5],
+        [6],
+        [7],
+        [8],
+        [9],
+        [10],
+        [11]
+    ])")
+            .ValueOrDie());
+    auto src_schema = arrow::schema(fields);
+    std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({3, 5});
+    // data in file rowGroup0:[0, 1, 2, 3, 4, 5] | rowGroup1:[6, 7, 8, 9, 10, 
11]
+
+    auto arrow_schema = arrow::schema(fields);
+    WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/12,
+               /*enable_dictionary=*/true,
+               /*max_row_group_length=*/6);
+
+    auto parquet_batch_reader = PrepareParquetFileBatchReader(
+        file_path_, arrow_schema, /*predicate=*/nullptr, bitmap, 
/*batch_size=*/12);
+
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<arrow::ChunkedArray> result_array,
+        
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+
+    auto expected_array = arrow::ChunkedArray::Make({src_array->Slice(0, 
6)}).ValueOrDie();
+    ASSERT_TRUE(result_array->Equals(expected_array)) << 
result_array->ToString();
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapPushDown) {
+    arrow::FieldVector fields = {arrow::field("f0", arrow::int32())};
+    auto arrow_type = arrow::struct_(fields);
+    arrow::StructBuilder struct_builder(arrow_type, 
arrow::default_memory_pool(),
+                                        
{std::make_shared<arrow::Int32Builder>()});
+    auto int_builder = 
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(0));
+    int32_t length = 1024;
+    for (int32_t i = 0; i < length; ++i) {
+        ASSERT_TRUE(struct_builder.Append().ok());
+        ASSERT_TRUE(int_builder->Append(i).ok());
+    }
+    // data file:
+    // rowGroup0: [0, 256)
+    // rowGroup1: [256, 512)
+    // rowGroup2: [512, 768)
+    // rowGroup3: [768, 1024)
+    std::shared_ptr<arrow::Array> src_array;
+    ASSERT_TRUE(struct_builder.Finish(&src_array).ok());
+    auto src_schema = arrow::schema(fields);
+    auto arrow_schema = arrow::schema(fields);
+    WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1024,
+               /*enable_dictionary=*/true,
+               /*max_row_group_length=*/256);
+    {
+        // simple case
+        std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({100, 
400, 600});
+        ASSERT_OK_AND_ASSIGN(
+            auto predicate,
+            PredicateBuilder::Or(
+                {PredicateBuilder::LessThan(/*field_index=*/0, 
/*field_name=*/"f0", FieldType::INT,
+                                            Literal(255)),
+                 PredicateBuilder::GreaterThan(/*field_index=*/0, 
/*field_name=*/"f0",
+                                               FieldType::INT, 
Literal(600))}));
+        auto parquet_batch_reader = PrepareParquetFileBatchReader(
+            file_path_, arrow_schema, predicate, bitmap, 
/*batch_size=*/length);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::ChunkedArray> result_array,
+            
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+
+        auto expected_array =
+            arrow::ChunkedArray::Make({src_array->Slice(0, 256), 
src_array->Slice(512, 256)})
+                .ValueOrDie();
+        ASSERT_TRUE(result_array->Equals(expected_array)) << 
result_array->ToString();
+    }
+    {
+        // test all data has been filtered out with predicate and bitmap 
pushdown
+        std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({100, 
400, 600});
+        auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, 
/*field_name=*/"f0",
+                                                       FieldType::INT, 
Literal(800));
+        auto parquet_batch_reader = PrepareParquetFileBatchReader(
+            file_path_, arrow_schema, predicate, bitmap, 
/*batch_size=*/length);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::ChunkedArray> result_array,
+            
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+        ASSERT_FALSE(result_array);
+    }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestReadNoField) {
+    // if only read partition fields, format reader will set empty read schema
+    std::string file_name = paimon::test::GetDataDir() +
+                            
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+                            
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+    // read no field
+    auto read_schema = arrow::schema(arrow::FieldVector());
+    auto parquet_batch_reader =
+        PrepareParquetFileBatchReader(file_name, read_schema, 
/*predicate=*/nullptr,
+                                      /*selection_bitmap=*/std::nullopt, 
/*batch_size=*/2);
+    // read 2 rows
+    ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
+              std::numeric_limits<uint64_t>::max());
+    ASSERT_OK_AND_ASSIGN(auto batch1, parquet_batch_reader->NextBatch());
+    ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 
0);
+    //  read 2 rows
+    ASSERT_OK_AND_ASSIGN(auto batch2, parquet_batch_reader->NextBatch());
+    ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 
2);
+    // read 2 rows
+    ASSERT_OK_AND_ASSIGN(auto batch3, parquet_batch_reader->NextBatch());
+    ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 
4);
+    // read rows with eof
+    ASSERT_OK_AND_ASSIGN(auto batch4, parquet_batch_reader->NextBatch());
+    ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 
6);
+    ASSERT_TRUE(BatchReader::IsEofBatch(batch4));
+    parquet_batch_reader->Close();
+
+    arrow::FieldVector fields;
+    auto arrow_type = arrow::struct_(fields);
+    auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([
+[],
+[]
+    ])")
+            .ValueOrDie());
+
+    auto check_batch = [](BatchReader::ReadBatch&& result_batch,
+                          const std::shared_ptr<arrow::Array>& expected_array) 
{
+        auto& [c_array, c_schema] = result_batch;
+        auto result_array = arrow::ImportArray(c_array.get(), 
c_schema.get()).ValueOr(nullptr);
+        ASSERT_TRUE(result_array);
+        ASSERT_TRUE(result_array->Equals(expected_array));
+    };
+
+    check_batch(std::move(batch1), expected_array);
+    check_batch(std::move(batch2), expected_array);
+    check_batch(std::move(batch3), expected_array);
+}
+
+TEST_P(ParquetFileBatchReaderTest, TestTimestampType) {
+    auto enable_tz = GetParam();
+    std::string timezone_str = enable_tz ? "Asia/Tokyo" : 
DateTimeUtils::GetLocalTimezoneName();
+    paimon::test::TimezoneGuard tz_guard(timezone_str);
+
+    arrow::FieldVector fields = {
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone_str)),
+        arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone_str)),
+        arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone_str)),
+        arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, 
timezone_str)),
+    };
+
+    auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02", 
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01 
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01 
00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004", 
"1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01 
00:00:06", null, "1970-01-01 00:00:00.000006", null]
+    ])")
+            .ValueOrDie());
+    std::shared_ptr<arrow::ChunkedArray> expected_array =
+        std::make_shared<arrow::ChunkedArray>(array);
+
+    {
+        // read data generated by Java Paimon
+        std::string file_name = paimon::test::GetDataDir() +
+                                
"/parquet/append_with_multiple_ts_precision_and_timezone.db/"
+                                
"append_with_multiple_ts_precision_and_timezone/bucket-0/"
+                                
"data-9b8abdde-df4d-4655-bb4c-ffda164ef9d4-0.parquet";
+        auto parquet_batch_reader = PrepareParquetFileBatchReader(
+            file_name, std::make_shared<arrow::Schema>(fields),
+            /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, 
batch_size_);
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::ChunkedArray> result_array,
+            
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+        ASSERT_TRUE(result_array->Equals(*expected_array)) << 
result_array->ToString();
+    }
+    {
+        // read data generated by C++ Paimon
+        auto arrow_schema = std::make_shared<arrow::Schema>(fields);
+        WriteArray(file_path_, array, arrow_schema,
+                   /*write_batch_size=*/1, /*enable_dictionary=*/true,
+                   /*max_row_group_length=*/1);
+        auto parquet_batch_reader =
+            PrepareParquetFileBatchReader(file_path_, arrow_schema, 
/*predicate=*/nullptr,
+                                          /*selection_bitmap=*/std::nullopt, 
/*batch_size=*/2);
+
+        // check file schema
+        ASSERT_OK_AND_ASSIGN(auto c_file_schema, 
parquet_batch_reader->GetFileSchema());
+        auto result_file_schema = 
arrow::ImportSchema(c_file_schema.get()).ValueOr(nullptr);
+        ASSERT_TRUE(result_file_schema);
+
+        arrow::FieldVector expected_fields = {
+            arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::MILLI)),
+            arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+            arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+            arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+            arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::MILLI, 
timezone_str)),
+            arrow::field("ts_tz_milli", 
arrow::timestamp(arrow::TimeUnit::MILLI, timezone_str)),
+            arrow::field("ts_tz_micro", 
arrow::timestamp(arrow::TimeUnit::MICRO, timezone_str)),
+            arrow::field("ts_tz_nano", 
arrow::timestamp(arrow::TimeUnit::NANO)),
+        };
+        auto expected_file_schema = arrow::schema(expected_fields);
+        ASSERT_TRUE(result_file_schema->Equals(expected_file_schema))
+            << result_file_schema->ToString();
+
+        // check array
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<arrow::ChunkedArray> result_array,
+            
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+        ASSERT_TRUE(result_array->Equals(expected_array)) << 
result_array->ToString();
+    }
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, ParquetFileBatchReaderTest, 
::testing::Values(false, true));
+
+}  // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_format_writer.cpp 
b/src/paimon/format/parquet/parquet_format_writer.cpp
new file mode 100644
index 0000000..2007948
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer.cpp
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_format_writer.h"
+
+#include <string>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+class Schema;
+}  // namespace arrow
+namespace paimon {
+class OutputStream;
+}  // namespace paimon
+struct ArrowArray;
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<ParquetFormatWriter>> ParquetFormatWriter::Create(
+    const std::shared_ptr<OutputStream>& output_stream,
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<::parquet::WriterProperties>& writer_properties, 
uint64_t max_memory_use,
+    const std::shared_ptr<arrow::MemoryPool>& pool) {
+    auto out = std::make_shared<ArrowOutputStreamAdapter>(output_stream);
+    ::parquet::ArrowWriterProperties::Builder arrow_properties_builder;
+    auto arrow_writer_properties =
+        arrow_properties_builder.enable_deprecated_int96_timestamps()->build();
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        std::unique_ptr<::parquet::arrow::FileWriter> file_writer,
+        ::parquet::arrow::FileWriter::Open(*schema, pool.get(), out, 
writer_properties,
+                                           arrow_writer_properties));
+    return std::unique_ptr<ParquetFormatWriter>(
+        new ParquetFormatWriter(std::move(file_writer), out, schema, 
max_memory_use, pool));
+}
+
+Status ParquetFormatWriter::AddBatch(ArrowArray* batch) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<::arrow::RecordBatch> 
record_batch,
+                                      arrow::ImportRecordBatch(batch, 
schema_));
+    if (static_cast<uint64_t>(pool_->bytes_allocated()) > max_memory_use_) {
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->NewBufferedRowGroup());
+    }
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->WriteRecordBatch(*record_batch));
+    total_records_written_ += (*record_batch).num_rows();
+    return Status::OK();
+}
+
+Status ParquetFormatWriter::Flush() {
+    metrics_->SetCounter(ParquetMetrics::WRITE_RECORD_COUNT, 
total_records_written_);
+    return Status::OK();
+}
+
+Status ParquetFormatWriter::Finish() {
+    PAIMON_RETURN_NOT_OK(Flush());
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->Close());
+    return Status::OK();
+}
+
+Result<bool> ParquetFormatWriter::ReachTargetSize(bool suggested_check, 
int64_t target_size) const {
+    if (suggested_check) {
+        PAIMON_ASSIGN_OR_RAISE(const uint64_t length, GetEstimateLength());
+        return length >= static_cast<uint64_t>(target_size);
+    }
+    return false;
+}
+
+Result<uint64_t> ParquetFormatWriter::GetEstimateLength() const {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t written_bytes, out_->Tell());
+    return writer_->GetBufferedSize() + written_bytes;
+}
+
+ParquetFormatWriter::ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter>
 writer,
+                                         const 
std::shared_ptr<ArrowOutputStreamAdapter>& out,
+                                         const std::shared_ptr<arrow::Schema>& 
schema,
+                                         uint64_t max_memory_use,
+                                         const 
std::shared_ptr<arrow::MemoryPool>& pool)
+    : pool_(pool),
+      out_(out),
+      writer_(std::move(writer)),
+      schema_(schema),
+      metrics_(std::make_shared<MetricsImpl>()),
+      max_memory_use_(max_memory_use) {}
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_writer.h 
b/src/paimon/format/parquet/parquet_format_writer.h
new file mode 100644
index 0000000..e5cae16
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/metrics.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/writer.h"
+
+namespace arrow {
+class MemoryPool;
+class Schema;
+}  // namespace arrow
+namespace paimon {
+class Metrics;
+class OutputStream;
+class ArrowOutputStreamAdapter;
+}  // namespace paimon
+namespace parquet {
+class WriterProperties;
+}  // namespace parquet
+struct ArrowArray;
+
+namespace paimon::parquet {
+
+class ParquetFormatWriter : public FormatWriter {
+ public:
+    static Result<std::unique_ptr<ParquetFormatWriter>> Create(
+        const std::shared_ptr<OutputStream>& output_stream,
+        const std::shared_ptr<arrow::Schema>& schema,
+        const std::shared_ptr<::parquet::WriterProperties>& writer_properties,
+        uint64_t max_memory_use, const std::shared_ptr<arrow::MemoryPool>& 
pool);
+
+    Status AddBatch(ArrowArray* batch) override;
+
+    Status Flush() override;
+
+    Status Finish() override;
+
+    Result<bool> ReachTargetSize(bool suggested_check, int64_t target_size) 
const override;
+
+    std::shared_ptr<Metrics> GetWriterMetrics() const override {
+        return metrics_;
+    }
+
+ private:
+    ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter> writer,
+                        const std::shared_ptr<ArrowOutputStreamAdapter>& out,
+                        const std::shared_ptr<arrow::Schema>& schema, uint64_t 
max_memory_use,
+                        const std::shared_ptr<arrow::MemoryPool>& pool);
+
+    Result<uint64_t> GetEstimateLength() const;
+
+    std::shared_ptr<arrow::MemoryPool> pool_;
+    std::shared_ptr<ArrowOutputStreamAdapter> out_;
+    std::unique_ptr<::parquet::arrow::FileWriter> writer_;
+    std::shared_ptr<arrow::Schema> schema_;
+    std::shared_ptr<Metrics> metrics_;
+    int64_t total_records_written_ = 0;
+    uint64_t max_memory_use_;
+};
+
+}  // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_writer_test.cpp 
b/src/paimon/format/parquet/parquet_format_writer_test.cpp
new file mode 100644
index 0000000..ccd0868
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer_test.cpp
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_format_writer.h"
+
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/file.h"
+#include "arrow/ipc/api.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/metrics.h"
+#include "paimon/record_batch.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+
+namespace paimon::parquet::test {
+
+class ParquetFormatWriterTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        dir_ = paimon::test::UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir_);
+        fs_ = std::make_shared<LocalFileSystem>();
+        pool_ = GetDefaultPool();
+        arrow_pool_ = GetArrowPool(pool_);
+    }
+    void TearDown() override {}
+
+    std::pair<std::shared_ptr<arrow::Schema>, 
std::shared_ptr<arrow::DataType>> PrepareArrowSchema()
+        const {
+        auto string_field = arrow::field(
+            "col1", arrow::utf8(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"0"}));
+        auto int_field = arrow::field(
+            "col2", arrow::int32(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"1"}));
+        auto bool_field = arrow::field(
+            "col3", arrow::boolean(),
+            
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID}, 
{"2"}));
+        auto struct_type = arrow::struct_({string_field, int_field, 
bool_field});
+        return std::make_pair(
+            arrow::schema(arrow::FieldVector({string_field, int_field, 
bool_field})), struct_type);
+    }
+
+    std::shared_ptr<arrow::Array> PrepareArray(const 
std::shared_ptr<arrow::DataType>& data_type,
+                                               int32_t record_batch_size, 
int32_t offset = 0,
+                                               bool all_null_value = false) 
const {
+        arrow::StructBuilder struct_builder(
+            data_type, arrow::default_memory_pool(),
+            {std::make_shared<arrow::StringBuilder>(), 
std::make_shared<arrow::Int32Builder>(),
+             std::make_shared<arrow::BooleanBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        auto int_builder = 
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+        auto bool_builder = 
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+        for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            if (all_null_value) {
+                EXPECT_TRUE(string_builder->AppendNull().ok());
+                EXPECT_TRUE(int_builder->AppendNull().ok());
+                EXPECT_TRUE(bool_builder->AppendNull().ok());
+            } else {
+                EXPECT_TRUE(string_builder->Append("str_" + 
std::to_string(i)).ok());
+                if (i % 3 == 0) {
+                    // test null
+                    EXPECT_TRUE(int_builder->AppendNull().ok());
+                } else {
+                    EXPECT_TRUE(int_builder->Append(i).ok());
+                }
+                EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 
2)).ok());
+            }
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return array;
+    }
+
+    void AddRecordBatchOnce(const std::shared_ptr<FormatWriter>& format_writer,
+                            const std::shared_ptr<arrow::DataType>& 
struct_type,
+                            int32_t record_batch_size, int32_t offset,
+                            bool all_null_value = false) const {
+        auto array = PrepareArray(struct_type, record_batch_size, offset, 
all_null_value);
+        auto arrow_array = std::make_unique<ArrowArray>();
+        ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+        auto batch = std::make_shared<RecordBatch>(
+            /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+            /*row_kinds=*/std::vector<RecordBatch::RowKind>(), 
arrow_array.get());
+        ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+    }
+
+    void CheckResult(const std::string& file_path, int32_t row_count,
+                     int32_t row_group_count) const {
+        auto file = arrow::io::ReadableFile::Open(file_path, 
arrow_pool_.get());
+        ASSERT_TRUE(file.ok());
+        std::unique_ptr<::parquet::arrow::FileReader> reader;
+        auto status = ::parquet::arrow::OpenFile(file.ValueOrDie(), 
arrow_pool_.get(), &reader);
+        ASSERT_TRUE(status.ok()) << status.ToString();
+        const ::parquet::FileMetaData* metadata = 
reader->parquet_reader()->metadata().get();
+        const ::parquet::SchemaDescriptor* schema = metadata->schema();
+        ASSERT_EQ(metadata->num_row_groups(), row_group_count);
+        ASSERT_EQ(schema->num_columns(), 3);
+        ASSERT_EQ(metadata->num_rows(), row_count);
+        ASSERT_EQ("col1", schema->Column(0)->name());
+        ASSERT_EQ("col2", schema->Column(1)->name());
+        ASSERT_EQ("col3", schema->Column(2)->name());
+        ASSERT_EQ(0, schema->Column(0)->schema_node()->field_id());
+        ASSERT_EQ(1, schema->Column(1)->schema_node()->field_id());
+        ASSERT_EQ(2, schema->Column(2)->schema_node()->field_id());
+
+        std::shared_ptr<::arrow::ChunkedArray> col0_array, col1_array, 
col2_array;
+        ASSERT_TRUE(reader->ReadColumn(0, &col0_array).ok());
+        ASSERT_TRUE(reader->ReadColumn(1, &col1_array).ok());
+        ASSERT_TRUE(reader->ReadColumn(2, &col2_array).ok());
+
+        const auto& string_array =
+            std::static_pointer_cast<arrow::StringArray>(col0_array->chunk(0));
+        ASSERT_TRUE(string_array);
+        const auto& int_array = 
std::static_pointer_cast<arrow::Int32Array>(col1_array->chunk(0));
+        ASSERT_TRUE(int_array);
+        const auto& bool_array =
+            
std::static_pointer_cast<arrow::BooleanArray>(col2_array->chunk(0));
+        ASSERT_TRUE(bool_array);
+        ASSERT_EQ(string_array->null_count(), 0);
+        ASSERT_EQ(int_array->null_count(), (row_count - 1) / 3 + 1);
+        ASSERT_EQ(bool_array->null_count(), 0);
+
+        for (int32_t i = 0; i < row_count; i++) {
+            ASSERT_EQ("str_" + std::to_string(i), string_array->GetString(i));
+            if (i % 3 == 0) {
+                ASSERT_TRUE(int_array->IsNull(i));
+            } else {
+                ASSERT_FALSE(int_array->IsNull(i));
+                ASSERT_EQ(i, int_array->Value(i));
+            }
+            if (i % 2 == 0) {
+                ASSERT_EQ(false, bool_array->Value(i));
+            } else {
+                ASSERT_EQ(true, bool_array->Value(i));
+            }
+        }
+    }
+
+ private:
+    std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+};
+
+TEST_F(ParquetFormatWriterTest, TestWriteWithVariousBatchSize) {
+    auto schema_pair = PrepareArrowSchema();
+    const auto& arrow_schema = schema_pair.first;
+    const auto& struct_type = schema_pair.second;
+    std::map<std::string, std::string> options;
+    for (auto record_batch_size : {1, 2, 3, 5, 20}) {
+        for (auto batch_capacity : {1, 2, 3, 5, 20}) {
+            std::string file_name =
+                std::to_string(record_batch_size) + "_" + 
std::to_string(batch_capacity);
+            std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+            ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                                 fs_->Create(file_path, /*overwrite=*/false));
+            ::parquet::WriterProperties::Builder builder;
+            builder.write_batch_size(batch_capacity);
+            auto writer_properties = builder.build();
+            ASSERT_OK_AND_ASSIGN(
+                auto format_writer,
+                ParquetFormatWriter::Create(out, arrow_schema, 
writer_properties,
+                                            
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, arrow_pool_));
+            auto array = PrepareArray(struct_type, record_batch_size);
+            auto arrow_array = std::make_unique<ArrowArray>();
+            ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+
+            auto batch = std::make_shared<RecordBatch>(
+                /*partition=*/std::map<std::string, std::string>(), 
/*bucket=*/-1,
+                /*row_kinds=*/std::vector<RecordBatch::RowKind>(), 
arrow_array.get());
+            ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+            ASSERT_OK(format_writer->Flush());
+            ASSERT_OK(format_writer->Finish());
+            ASSERT_OK(out->Flush());
+            ASSERT_OK(out->Close());
+            CheckResult(file_path, record_batch_size, /*row_group_count=*/1);
+        }
+    }
+}
+
+TEST_F(ParquetFormatWriterTest, TestWriteWithV1Version) {
+    auto schema_pair = PrepareArrowSchema();
+    const auto& arrow_schema = schema_pair.first;
+    const auto& struct_type = schema_pair.second;
+    std::map<std::string, std::string> options;
+    auto record_batch_size = 10;
+    auto batch_capacity = 5;
+    std::string file_name = "test.parquet";
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs_->Create(file_path, /*overwrite=*/false));
+    ::parquet::WriterProperties::Builder builder;
+    builder.write_batch_size(batch_capacity);
+    builder.version(::parquet::ParquetVersion::type::PARQUET_1_0);
+    auto writer_properties = builder.build();
+    ASSERT_OK_AND_ASSIGN(
+        auto format_writer,
+        ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+                                    DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+    auto array = PrepareArray(struct_type, record_batch_size);
+    auto arrow_array = std::make_unique<ArrowArray>();
+    ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+
+    auto batch = std::make_shared<RecordBatch>(
+        /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+        /*row_kinds=*/std::vector<RecordBatch::RowKind>(), arrow_array.get());
+    ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+    ASSERT_OK(format_writer->Flush());
+    ASSERT_OK(format_writer->Finish());
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+    CheckResult(file_path, record_batch_size, /*row_group_count=*/1);
+}
+
+TEST_F(ParquetFormatWriterTest, TestWriteMultipleTimes) {
+    // arrow array length = 6 + 10 + 15 + 6 = 37
+    // parquet batch capacity = 10
+    auto schema_pair = PrepareArrowSchema();
+    const auto& arrow_schema = schema_pair.first;
+    const auto& struct_type = schema_pair.second;
+
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), 
"write_multiple_times");
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs_->Create(file_path, /*overwrite=*/false));
+    ::parquet::WriterProperties::Builder builder;
+    builder.write_batch_size(10);
+    auto writer_properties = builder.build();
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<ParquetFormatWriter> format_writer,
+        ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+                                    DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+
+    // add batch first time, 6 rows
+    AddRecordBatchOnce(format_writer, struct_type, 6, 0);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len1, 
format_writer->GetEstimateLength());
+    ASSERT_GT(estimate_len1, 0);
+
+    // add batch second times, 10 rows
+    AddRecordBatchOnce(format_writer, struct_type, 10, 6);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len2, 
format_writer->GetEstimateLength());
+    ASSERT_EQ(estimate_len2, estimate_len1);
+
+    // add batch third times, 15 rows (expand internal batch)
+    AddRecordBatchOnce(format_writer, struct_type, 15, 16);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len3, 
format_writer->GetEstimateLength());
+    ASSERT_EQ(estimate_len3, estimate_len2);
+
+    // add batch fourth times, 6 rows
+    AddRecordBatchOnce(format_writer, struct_type, 6, 31);
+
+    ASSERT_OK(format_writer->Flush());
+    ASSERT_OK(format_writer->Finish());
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+    CheckResult(file_path, /*row_count=*/37, /*row_group_count=*/1);
+    auto metrics = format_writer->GetWriterMetrics();
+    ASSERT_OK_AND_ASSIGN(uint64_t counter, 
metrics->GetCounter(ParquetMetrics::WRITE_RECORD_COUNT));
+    ASSERT_EQ(37, counter);
+}
+
+TEST_F(ParquetFormatWriterTest, TestGetEstimateLength) {
+    auto schema_pair = PrepareArrowSchema();
+    const auto& arrow_schema = schema_pair.first;
+    const auto& struct_type = schema_pair.second;
+
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), 
"get_estimate_length");
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs_->Create(file_path, /*overwrite=*/false));
+    ::parquet::WriterProperties::Builder builder;
+    auto writer_properties = builder.build();
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<ParquetFormatWriter> format_writer,
+        ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+                                    DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+
+    // add batch first time, 1 row
+    AddRecordBatchOnce(format_writer, struct_type, 1, 0);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len1, 
format_writer->GetEstimateLength());
+    ASSERT_GT(estimate_len1, 0);
+
+    // add batch second times, 9998 rows
+    AddRecordBatchOnce(format_writer, struct_type, 9998, 1);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len2, 
format_writer->GetEstimateLength());
+    ASSERT_EQ(estimate_len2, estimate_len1);
+
+    AddRecordBatchOnce(format_writer, struct_type, 100000, 9999);
+    ASSERT_OK_AND_ASSIGN(uint64_t estimate_len3, 
format_writer->GetEstimateLength());
+    ASSERT_GT(estimate_len3, estimate_len2);
+    ASSERT_OK(format_writer->Finish());
+}
+
+TEST_F(ParquetFormatWriterTest, TestMemoryControl) {
+    auto run = [&](bool all_null_value, uint64_t max_memory_use) {
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<FileFormat> file_format,
+            FileFormatFactory::Get(
+                "parquet", {{Options::FILE_FORMAT, "parquet"},
+                            {Options::MANIFEST_FORMAT, "parquet"},
+                            {"parquet.writer.max.memory.use", 
std::to_string(max_memory_use)}}));
+
+        std::shared_ptr<MemoryPool> pool = GetMemoryPool();
+        auto schema_pair = PrepareArrowSchema();
+        const auto& arrow_schema = schema_pair.first;
+        const auto& struct_type = schema_pair.second;
+        int32_t batch_size = 4096;
+
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok());
+        ASSERT_OK_AND_ASSIGN(auto writer_builder,
+                             file_format->CreateWriterBuilder(c_schema.get(), 
batch_size));
+        ASSERT_OK_AND_ASSIGN(
+            std::shared_ptr<OutputStream> out,
+            fs_->Create(
+                PathUtil::JoinPath(dir_->Str(), std::to_string(all_null_value) 
+
+                                                    
std::to_string(max_memory_use) + ".parquet"),
+                /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<FormatWriter> writer,
+                             writer_builder->WithMemoryPool(pool)->Build(out, 
"uncompressed"));
+
+        auto array = PrepareArray(struct_type, batch_size, /*offset=*/0, 
all_null_value);
+        for (int32_t i = 0; i < 2000; ++i) {
+            auto arrow_array = std::make_unique<ArrowArray>();
+            ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+            auto batch = std::make_shared<RecordBatch>(
+                /*partition=*/std::map<std::string, std::string>(), 
/*bucket=*/-1,
+                /*row_kinds=*/std::vector<RecordBatch::RowKind>(), 
arrow_array.get());
+            ASSERT_OK(writer->AddBatch(batch->GetData()));
+            ASSERT_OK(writer->Flush());
+        }
+
+        ASSERT_OK(writer->Flush());
+        ASSERT_OK(writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+        uint64_t actual_max_mem = pool->MaxMemoryUsage();
+        ASSERT_GT(actual_max_mem, max_memory_use);
+        ASSERT_LT(actual_max_mem, max_memory_use * 1.5);  // allow 50% overhead
+    };
+    run(/*all_null_value=*/true, /*max_memory_use=*/20 * 1024 * 1024);   // 
20MB
+    run(/*all_null_value=*/true, /*max_memory_use=*/40 * 1024 * 1024);   // 
40MB
+    run(/*all_null_value=*/false, /*max_memory_use=*/20 * 1024 * 1024);  // 
20MB
+    run(/*all_null_value=*/false, /*max_memory_use=*/40 * 1024 * 1024);  // 
40MB
+}
+
+TEST_F(ParquetFormatWriterTest, TestMemoryControlForCheckRowGroupCount) {
+    auto run = [&](int32_t write_times) {
+        ASSERT_OK_AND_ASSIGN(
+            std::unique_ptr<FileFormat> file_format,
+            FileFormatFactory::Get("parquet", {{Options::FILE_FORMAT, 
"parquet"},
+                                               {Options::MANIFEST_FORMAT, 
"parquet"},
+                                               
{"parquet.writer.max.memory.use", "1"}}));
+
+        auto schema_pair = PrepareArrowSchema();
+        const auto& arrow_schema = schema_pair.first;
+        const auto& struct_type = schema_pair.second;
+        int32_t batch_size = 4096;
+        std::string file_path =
+            PathUtil::JoinPath(dir_->Str(), std::to_string(write_times) + 
".parquet");
+
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok());
+        ASSERT_OK_AND_ASSIGN(auto writer_builder,
+                             file_format->CreateWriterBuilder(c_schema.get(), 
batch_size));
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                             fs_->Create(file_path, /*overwrite=*/false));
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<FormatWriter> writer,
+                             writer_builder->Build(out, "uncompressed"));
+
+        for (int32_t i = 0; i < write_times; ++i) {
+            AddRecordBatchOnce(writer, struct_type, 10, i * 10);
+        }
+
+        ASSERT_OK(writer->Flush());
+        ASSERT_OK(writer->Finish());
+        ASSERT_OK(out->Flush());
+        ASSERT_OK(out->Close());
+        CheckResult(file_path, /*row_count=*/write_times * 10, 
/*row_group_count=*/write_times);
+    };
+    run(/*write_times=*/1);
+    run(/*write_times=*/2);
+    run(/*write_times=*/5);
+}
+
+TEST_F(ParquetFormatWriterTest, TestTimestampType) {
+    auto timezone = DateTimeUtils::GetLocalTimezoneName();
+    arrow::FieldVector fields = {
+        arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+        arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+        arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+        arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+        arrow::field("ts_utc1", arrow::timestamp(arrow::TimeUnit::SECOND, 
timezone)),
+        arrow::field("ts_utc2", arrow::timestamp(arrow::TimeUnit::MICRO, 
timezone))};
+
+    std::string file_path = PathUtil::JoinPath(dir_->Str(), 
"timezone.parquet");
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+                         fs_->Create(file_path, /*overwrite=*/true));
+    ::parquet::WriterProperties::Builder builder;
+    auto writer_properties = builder.build();
+    ASSERT_OK_AND_ASSIGN(
+        std::shared_ptr<ParquetFormatWriter> format_writer,
+        ParquetFormatWriter::Create(out, 
std::make_shared<arrow::Schema>(fields), writer_properties,
+                                    DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, 
arrow_pool_));
+
+    auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+        arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 
00:00:00.000001", "1970-01-01 00:00:00.000000001",
+"1970-01-01 00:00:02", "1970-01-01 00:00:00.002"],
+["1970-01-01 00:00:01", null, "1970-01-01 00:00:00.000001", null,"1970-01-01 
00:00:02", null]
+    ])")
+            .ValueOrDie());
+
+    ArrowArray c_array;
+    ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok());
+    ASSERT_OK(format_writer->AddBatch(&c_array));
+    ASSERT_OK(format_writer->Flush());
+    ASSERT_OK(format_writer->Finish());
+    ASSERT_OK(out->Flush());
+    ASSERT_OK(out->Close());
+}
+
+}  // namespace paimon::parquet::test

Reply via email to