This is an automated email from the ASF dual-hosted git repository.
leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ba2a108 feat(format): introduce parquet reader and writer (#51)
ba2a108 is described below
commit ba2a108a8af68b07398da0180056c76f3ce02cd3
Author: Zhang Jiawei <[email protected]>
AuthorDate: Fri Jun 5 18:29:34 2026 +0800
feat(format): introduce parquet reader and writer (#51)
---
src/paimon/format/parquet/file_reader_wrapper.cpp | 209 +++++++
src/paimon/format/parquet/file_reader_wrapper.h | 129 ++++
.../format/parquet/file_reader_wrapper_test.cpp | 331 +++++++++++
.../format/parquet/parquet_file_batch_reader.cpp | 322 ++++++++++
.../format/parquet/parquet_file_batch_reader.h | 187 ++++++
.../parquet/parquet_file_batch_reader_test.cpp | 646 +++++++++++++++++++++
.../format/parquet/parquet_format_writer.cpp | 109 ++++
src/paimon/format/parquet/parquet_format_writer.h | 84 +++
.../format/parquet/parquet_format_writer_test.cpp | 470 +++++++++++++++
9 files changed, 2487 insertions(+)
diff --git a/src/paimon/format/parquet/file_reader_wrapper.cpp
b/src/paimon/format/parquet/file_reader_wrapper.cpp
new file mode 100644
index 0000000..674e3c6
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper.cpp
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/file_reader_wrapper.h"
+
+#include <cassert>
+#include <cstddef>
+
+#include "arrow/record_batch.h"
+#include "arrow/util/range.h"
+#include "fmt/format.h"
+#include "paimon/macros.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<FileReaderWrapper>> FileReaderWrapper::Create(
+ std::unique_ptr<::parquet::arrow::FileReader>&& file_reader) {
+ if (file_reader == nullptr) {
+ return Status::Invalid("file reader wrapper create failed. file reader
is nullptr");
+ }
+ std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges;
+ auto meta_data = file_reader->parquet_reader()->metadata();
+ // prepare [start_row_idx, end_row_idx) for all row groups
+ uint64_t start_row_idx = 0;
+ for (int32_t i = 0; i < meta_data->num_row_groups(); i++) {
+ uint64_t end_row_idx = start_row_idx +
meta_data->RowGroup(i)->num_rows();
+ all_row_group_ranges.emplace_back(start_row_idx, end_row_idx);
+ start_row_idx = end_row_idx;
+ }
+ uint64_t num_rows = file_reader->parquet_reader()->metadata()->num_rows();
+ if (start_row_idx != num_rows) {
+ assert(false);
+ return Status::Invalid(
+ fmt::format("unexpected error. row group ranges not match with num
rows {}", num_rows));
+ }
+ std::vector<int32_t> row_groups_indices =
arrow::internal::Iota(file_reader->num_row_groups());
+ std::vector<int32_t> columns_indices =
+
arrow::internal::Iota(file_reader->parquet_reader()->metadata()->num_columns());
+ auto file_reader_wrapper = std::unique_ptr<FileReaderWrapper>(
+ new FileReaderWrapper(std::move(file_reader), all_row_group_ranges,
num_rows));
+ PAIMON_RETURN_NOT_OK(file_reader_wrapper->PrepareForReadingLazy(
+ std::set<int32_t>(row_groups_indices.begin(),
row_groups_indices.end()), columns_indices));
+ return file_reader_wrapper;
+}
+
+FileReaderWrapper::FileReaderWrapper(
+ std::unique_ptr<::parquet::arrow::FileReader>&& file_reader,
+ const std::vector<std::pair<uint64_t, uint64_t>>& all_row_group_ranges,
uint64_t num_rows)
+ : file_reader_(std::move(file_reader)),
+ all_row_group_ranges_(all_row_group_ranges),
+ num_rows_(num_rows) {}
+
+Status FileReaderWrapper::SeekToRow(uint64_t row_number) {
+ for (uint64_t i = 0; i < target_row_groups_.size(); i++) {
+ if (row_number > target_row_groups_[i].first && row_number <
target_row_groups_[i].second) {
+ return Status::Invalid(fmt::format(
+ "seek to row failed. row number {} should not be in the middle
of readable range",
+ row_number));
+ }
+ if (target_row_groups_[i].first >= row_number) {
+ current_row_group_idx_ = i;
+ next_row_to_read_ = target_row_groups_[i].first;
+ std::vector<int32_t> target_row_group_indices;
+ for (uint64_t j = i; j < target_row_groups_.size(); j++) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id,
GetRowGroupId(target_row_groups_[j]));
+ target_row_group_indices.push_back(row_group_id);
+ }
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader(
+ target_row_group_indices, target_column_indices_,
&batch_reader_));
+ return Status::OK();
+ }
+ }
+ next_row_to_read_ = num_rows_;
+ current_row_group_idx_ = target_row_groups_.size();
+ return Status::OK();
+}
+
+Result<std::shared_ptr<arrow::RecordBatch>> FileReaderWrapper::Next() {
+ if (PAIMON_UNLIKELY(!reader_initialized_)) {
+ PAIMON_RETURN_NOT_OK(PrepareForReading(target_row_group_indices_,
target_column_indices_));
+ }
+ std::shared_ptr<arrow::RecordBatch> record_batch;
+ if (current_row_group_idx_ < target_row_groups_.size()) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(record_batch, batch_reader_->Next());
+ }
+ if (record_batch) {
+ int64_t num_rows = record_batch->num_rows();
+ previous_first_row_ = next_row_to_read_;
+ if (next_row_to_read_ + num_rows <
target_row_groups_[current_row_group_idx_].second) {
+ next_row_to_read_ += num_rows;
+ } else if (next_row_to_read_ + num_rows ==
+ target_row_groups_[current_row_group_idx_].second) {
+ if (current_row_group_idx_ == target_row_groups_.size() - 1) {
+ // current row group is the last.
+ next_row_to_read_ = num_rows_;
+ } else {
+ current_row_group_idx_++;
+ next_row_to_read_ =
target_row_groups_[current_row_group_idx_].first;
+ }
+ } else {
+ return Status::Invalid(fmt::format(
+ "Next failed. Unexpected error, next row to read {} + num rows
just read {} "
+ "should always be within current row group range or exactly
equals to current "
+ "row group end {}",
+ next_row_to_read_, num_rows,
target_row_groups_[current_row_group_idx_].second));
+ }
+ } else {
+ previous_first_row_ = next_row_to_read_;
+ }
+ return record_batch;
+}
+
+Result<std::vector<std::pair<uint64_t, uint64_t>>>
FileReaderWrapper::GetRowGroupRanges(
+ const std::set<int32_t>& row_group_indices) const {
+ std::vector<std::pair<uint64_t, uint64_t>> row_group_ranges;
+ for (auto row_group_index : row_group_indices) {
+ if (static_cast<size_t>(row_group_index) >=
all_row_group_ranges_.size()) {
+ return Status::Invalid(fmt::format("row group index {} is out of
bound {}",
+ row_group_index,
all_row_group_ranges_.size()));
+ }
+ row_group_ranges.push_back(all_row_group_ranges_[row_group_index]);
+ }
+ return row_group_ranges;
+}
+
+Status FileReaderWrapper::PrepareForReadingLazy(const std::set<int32_t>&
target_row_group_indices,
+ const std::vector<int32_t>&
column_indices) {
+ target_row_group_indices_ = target_row_group_indices;
+ target_column_indices_ = column_indices;
+ reader_initialized_ = false;
+ return Status::OK();
+}
+
+Status FileReaderWrapper::PrepareForReading(const std::set<int32_t>&
target_row_group_indices,
+ const std::vector<int32_t>&
column_indices) {
+ std::vector<std::pair<uint64_t, uint64_t>> target_row_groups;
+ PAIMON_ASSIGN_OR_RAISE(target_row_groups,
GetRowGroupRanges(target_row_group_indices));
+ std::unique_ptr<arrow::RecordBatchReader> batch_reader;
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetRecordBatchReader(
+ std::vector<int32_t>(target_row_group_indices.begin(),
target_row_group_indices.end()),
+ column_indices, &batch_reader));
+ target_row_groups_ = target_row_groups;
+ target_column_indices_ = column_indices;
+ batch_reader_ = std::move(batch_reader);
+ if (target_row_groups_.empty()) {
+ next_row_to_read_ = num_rows_;
+ } else {
+ next_row_to_read_ = target_row_groups_[0].first;
+ }
+ previous_first_row_ = std::numeric_limits<uint64_t>::max();
+ current_row_group_idx_ = 0;
+ reader_initialized_ = true;
+ return Status::OK();
+}
+
+Result<std::set<int32_t>> FileReaderWrapper::FilterRowGroupsByReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+ const std::vector<int32_t>& src_row_groups) const {
+ std::set<int32_t> target_row_groups;
+ PAIMON_ASSIGN_OR_RAISE(std::set<int32_t> row_groups_to_read,
+ ReadRangesToRowGroupIds(read_ranges));
+ for (const auto& row_group_id : src_row_groups) {
+ if (row_groups_to_read.find(row_group_id) != row_groups_to_read.end())
{
+ target_row_groups.emplace(row_group_id);
+ }
+ }
+ return target_row_groups;
+}
+
+Result<std::set<int32_t>> FileReaderWrapper::ReadRangesToRowGroupIds(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const {
+ std::set<int32_t> selected_row_group_ids;
+ for (const auto& read_range : read_ranges) {
+ PAIMON_ASSIGN_OR_RAISE(int32_t row_group_id,
GetRowGroupId(read_range));
+ selected_row_group_ids.emplace(row_group_id);
+ }
+ return selected_row_group_ids;
+}
+
+Result<int32_t> FileReaderWrapper::GetRowGroupId(std::pair<uint64_t, uint64_t>
target_range) const {
+ for (size_t i = 0; i < all_row_group_ranges_.size(); i++) {
+ if (all_row_group_ranges_[i] == target_range) {
+ return i;
+ }
+ }
+ return Status::Invalid(fmt::format(
+ "not expected failure. target range bound '{},{}' not match with row
group range bound",
+ target_range.first, target_range.second));
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/file_reader_wrapper.h
b/src/paimon/format/parquet/file_reader_wrapper.h
new file mode 100644
index 0000000..f20e94e
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper.h
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/compute/api.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+
+namespace arrow {
+class Schema;
+} // namespace arrow
+
+namespace paimon::parquet {
+
+// The FileReaderWrapper is a decorator class designed to support seek
functionality, as well as the
+// methods GetPreviousBatchFirstRowNumber and GetNextRowToRead.
+class FileReaderWrapper {
+ public:
+ static Result<std::unique_ptr<FileReaderWrapper>> Create(
+ std::unique_ptr<::parquet::arrow::FileReader>&& reader);
+
+ Status SeekToRow(uint64_t row_number);
+
+ Result<std::shared_ptr<arrow::RecordBatch>> Next();
+
+ Result<uint64_t> GetPreviousBatchFirstRowNumber() const {
+ return previous_first_row_;
+ }
+
+ uint64_t GetNextRowToRead() const {
+ return next_row_to_read_;
+ }
+
+ uint64_t GetNumberOfRows() const {
+ return num_rows_;
+ }
+
+ int32_t GetNumberOfRowGroups() const {
+ return file_reader_->num_row_groups();
+ }
+
+ ::parquet::arrow::FileReader* GetFileReader() const {
+ return file_reader_.get();
+ }
+
+ const std::vector<std::pair<uint64_t, uint64_t>>& GetAllRowGroupRanges()
const {
+ return all_row_group_ranges_;
+ }
+
+ Result<std::shared_ptr<arrow::Schema>> GetSchema() const {
+ std::shared_ptr<arrow::Schema> file_schema;
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_->GetSchema(&file_schema));
+ return file_schema;
+ }
+
+ Status Close() {
+ if (batch_reader_) {
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(batch_reader_->Close());
+ }
+ return Status::OK();
+ }
+
+ Result<std::vector<std::pair<uint64_t, uint64_t>>> GetRowGroupRanges(
+ const std::set<int32_t>& row_group_indices) const;
+
+ Status PrepareForReadingLazy(const std::set<int32_t>& row_group_indices,
+ const std::vector<int32_t>& column_indices);
+ Status PrepareForReading(const std::set<int32_t>& row_group_indices,
+ const std::vector<int32_t>& column_indices);
+
+ Result<std::set<int32_t>> FilterRowGroupsByReadRanges(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges,
+ const std::vector<int32_t>& src_row_groups) const;
+
+ private:
+ FileReaderWrapper(std::unique_ptr<::parquet::arrow::FileReader>&&
file_reader,
+ const std::vector<std::pair<uint64_t, uint64_t>>&
all_row_group_ranges,
+ uint64_t num_rows);
+
+ Result<std::set<int32_t>> ReadRangesToRowGroupIds(
+ const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) const;
+ Result<int32_t> GetRowGroupId(std::pair<uint64_t, uint64_t> target_range)
const;
+
+ std::unique_ptr<::parquet::arrow::FileReader> file_reader_;
+ std::unique_ptr<arrow::RecordBatchReader> batch_reader_;
+
+ std::vector<std::pair<uint64_t, uint64_t>> all_row_group_ranges_;
+ std::set<int32_t> target_row_group_indices_;
+ std::vector<std::pair<uint64_t, uint64_t>> target_row_groups_;
+ std::vector<int32_t> target_column_indices_;
+
+ const uint64_t num_rows_;
+ uint64_t next_row_to_read_ = std::numeric_limits<uint64_t>::max();
+ uint64_t previous_first_row_ = std::numeric_limits<uint64_t>::max();
+ uint64_t current_row_group_idx_ = 0;
+ bool reader_initialized_ = false;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/file_reader_wrapper_test.cpp
b/src/paimon/format/parquet/file_reader_wrapper_test.cpp
new file mode 100644
index 0000000..12f90a1
--- /dev/null
+++ b/src/paimon/format/parquet/file_reader_wrapper_test.cpp
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/file_reader_wrapper.h"
+
+#include <map>
+#include <string>
+
+#include "arrow/api.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/caching.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/record_batch.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon::parquet::test {
+
+class FileReaderWrapperTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ fs_ = std::make_shared<LocalFileSystem>();
+ pool_ = GetDefaultPool();
+ arrow_pool_ = GetArrowPool(pool_);
+ batch_size_ = 512;
+ }
+ void TearDown() override {}
+
+ std::pair<std::shared_ptr<arrow::Schema>,
std::shared_ptr<arrow::DataType>> PrepareArrowSchema()
+ const {
+ auto string_field = arrow::field(
+ "col1", arrow::utf8(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"0"}));
+ auto int_field = arrow::field(
+ "col2", arrow::int32(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"1"}));
+ auto bool_field = arrow::field(
+ "col3", arrow::boolean(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"2"}));
+ auto struct_type = arrow::struct_({string_field, int_field,
bool_field});
+ return std::make_pair(
+ arrow::schema(arrow::FieldVector({string_field, int_field,
bool_field})), struct_type);
+ }
+
+ std::shared_ptr<arrow::Array> PrepareArray(const
std::shared_ptr<arrow::DataType>& data_type,
+ int32_t record_batch_size,
+ int32_t offset = 0) const {
+ arrow::StructBuilder struct_builder(
+ data_type, arrow::default_memory_pool(),
+ {std::make_shared<arrow::StringBuilder>(),
std::make_shared<arrow::Int32Builder>(),
+ std::make_shared<arrow::BooleanBuilder>()});
+ auto string_builder =
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+ auto int_builder =
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+ auto bool_builder =
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+ for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) {
+ EXPECT_TRUE(struct_builder.Append().ok());
+ EXPECT_TRUE(string_builder->Append("str_" +
std::to_string(i)).ok());
+ if (i % 3 == 0) {
+ // test null
+ EXPECT_TRUE(int_builder->AppendNull().ok());
+ } else {
+ EXPECT_TRUE(int_builder->Append(i).ok());
+ }
+ EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i % 2)).ok());
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(struct_builder.Finish(&array).ok());
+ return array;
+ }
+
+ void AddRecordBatchOnce(const std::shared_ptr<ParquetFormatWriter>&
format_writer,
+ const std::shared_ptr<arrow::DataType>&
struct_type,
+ int32_t record_batch_size, int32_t offset) const {
+ auto array = PrepareArray(struct_type, record_batch_size, offset);
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+ auto batch = std::make_shared<RecordBatch>(
+ /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+ /*row_kinds=*/std::vector<RecordBatch::RowKind>(),
arrow_array.get());
+ ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+ }
+
+ Result<std::unique_ptr<FileReaderWrapper>> PrepareReaderWrapper(const
std::string& file_path) {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> in,
fs_->Open(file_path));
+ PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, in->Length());
+ auto input_stream = std::make_unique<ArrowInputStreamAdapter>(in,
arrow_pool_, file_length);
+ ::parquet::arrow::FileReaderBuilder file_reader_builder;
+ ::parquet::ReaderProperties reader_properties;
+ reader_properties.enable_buffered_stream();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ file_reader_builder.Open(std::move(input_stream),
reader_properties));
+
+ ::parquet::ArrowReaderProperties arrow_reader_props;
+ arrow_reader_props.set_pre_buffer(true);
+ arrow_reader_props.set_batch_size(static_cast<int64_t>(batch_size_));
+ arrow_reader_props.set_use_threads(true);
+
arrow_reader_props.set_cache_options(arrow::io::CacheOptions::Defaults());
+ std::unique_ptr<::parquet::arrow::FileReader> file_reader;
+
PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(arrow_pool_.get())
+ ->properties(arrow_reader_props)
+ ->Build(&file_reader));
+ return FileReaderWrapper::Create(std::move(file_reader));
+ }
+
+ void PrepareParquetFile(const std::string& file_path, int32_t row_count) {
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(10);
+ builder.max_row_group_length(1000);
+ builder.enable_store_decimal_as_integer();
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ParquetFormatWriter> format_writer,
+ ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+
+ AddRecordBatchOnce(format_writer, struct_type,
/*record_batch_size=*/row_count,
+ /*offset=*/0);
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ }
+
+ private:
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+ int32_t batch_size_;
+};
+
+TEST_F(FileReaderWrapperTest, EmptyFile) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/0);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ ASSERT_EQ(0, reader_wrapper->GetNumberOfRows());
+ ASSERT_EQ(0, reader_wrapper->GetNumberOfRowGroups());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+ reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(auto batch, reader_wrapper->Next());
+ ASSERT_EQ(0, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_EQ(0, reader_wrapper->GetNextRowToRead());
+ ASSERT_TRUE(reader_wrapper->GetAllRowGroupRanges().empty());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::RecordBatch> record_batch,
reader_wrapper->Next());
+ ASSERT_FALSE(record_batch);
+}
+
+TEST_F(FileReaderWrapperTest, NullFileReader) {
+ ASSERT_NOK_WITH_MSG(FileReaderWrapper::Create(nullptr),
+ "file reader wrapper create failed. file reader is
nullptr");
+}
+
+TEST_F(FileReaderWrapperTest, Simple) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/5500);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ ASSERT_EQ(5500, reader_wrapper->GetNumberOfRows());
+ ASSERT_EQ(6, reader_wrapper->GetNumberOfRowGroups());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+ reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ std::vector<std::pair<uint64_t, uint64_t>> expected_all_row_group_ranges =
{
+ {0, 1000}, {1000, 2000}, {2000, 3000}, {3000, 4000}, {4000, 5000},
{5000, 5500}};
+ ASSERT_EQ(expected_all_row_group_ranges,
reader_wrapper->GetAllRowGroupRanges());
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::RecordBatch> record_batch,
reader_wrapper->Next());
+ ASSERT_TRUE(record_batch);
+ ASSERT_EQ(512, record_batch->num_rows());
+ ASSERT_EQ(512, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(0, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_TRUE(record_batch);
+ ASSERT_EQ(488, record_batch->num_rows());
+ ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(512, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_TRUE(record_batch);
+ ASSERT_EQ(512, record_batch->num_rows());
+ ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_NOK_WITH_MSG(reader_wrapper->SeekToRow(1001),
+ "should not be in the middle of readable range");
+ ASSERT_OK(reader_wrapper->SeekToRow(1000));
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_TRUE(record_batch);
+ ASSERT_EQ(512, record_batch->num_rows());
+ ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+
+ ASSERT_OK(reader_wrapper->SeekToRow(5600));
+ ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(6, reader_wrapper->current_row_group_idx_);
+ ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_FALSE(record_batch);
+ ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(5500, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+}
+
+TEST_F(FileReaderWrapperTest, GetRowGroupRanges) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/5500);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ ASSERT_OK_AND_ASSIGN(auto ranges, reader_wrapper->GetRowGroupRanges({0, 3,
5}));
+ std::vector<std::pair<uint64_t, uint64_t>> expected_read_ranges = {
+ {0, 1000}, {3000, 4000}, {5000, 5500}};
+ ASSERT_EQ(expected_read_ranges, ranges);
+ ASSERT_NOK_WITH_MSG(reader_wrapper->GetRowGroupRanges({0, 3, 6}), "out of
bound");
+ ASSERT_OK_AND_ASSIGN(ranges, reader_wrapper->GetRowGroupRanges({}));
+ ASSERT_TRUE(ranges.empty());
+}
+
+TEST_F(FileReaderWrapperTest, ReadRangesToRowGroupIds) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/5500);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ std::set<int32_t> expected_row_group_ids = {0, 3, 5};
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 1000}, {3000, 4000}, {5000, 5500}};
+ ASSERT_OK_AND_ASSIGN(auto row_group_ids,
reader_wrapper->ReadRangesToRowGroupIds(read_ranges));
+ ASSERT_EQ(expected_row_group_ids, row_group_ids);
+ std::vector<std::pair<uint64_t, uint64_t>> invalid_ranges = {
+ {0, 1000}, {3000, 4000}, {5000, 5600}};
+
ASSERT_NOK_WITH_MSG(reader_wrapper->ReadRangesToRowGroupIds(invalid_ranges),
+ "not match with row group range bound");
+ ASSERT_OK_AND_ASSIGN(row_group_ids,
reader_wrapper->ReadRangesToRowGroupIds({}));
+ ASSERT_TRUE(row_group_ids.empty());
+}
+
+TEST_F(FileReaderWrapperTest, FilterRowGroupsByReadRanges) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/5500);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ std::set<int32_t> expected_row_group_ids = {0, 5};
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges = {
+ {0, 1000}, {3000, 4000}, {5000, 5500}};
+ ASSERT_OK_AND_ASSIGN(auto row_group_ids,
+
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {0, 1, 2, 4, 5}));
+ ASSERT_EQ(expected_row_group_ids, row_group_ids);
+
+ ASSERT_OK_AND_ASSIGN(row_group_ids,
+
reader_wrapper->FilterRowGroupsByReadRanges(read_ranges, {}));
+ ASSERT_TRUE(row_group_ids.empty());
+}
+
+TEST_F(FileReaderWrapperTest, PrepareForReading) {
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+ PrepareParquetFile(file_path, /*row_count=*/5500);
+ ASSERT_OK_AND_ASSIGN(auto reader_wrapper, PrepareReaderWrapper(file_path));
+ ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{1},
+ /*column_indices=*/{0}));
+ // seek before actual read range
+ ASSERT_OK(reader_wrapper->SeekToRow(0));
+ ASSERT_EQ(1000, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+ reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(auto record_batch, reader_wrapper->Next());
+ ASSERT_EQ(512, record_batch->num_rows());
+ ASSERT_EQ(1, record_batch->num_columns());
+ ASSERT_EQ(1512, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(1000, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_TRUE(record_batch);
+ ASSERT_EQ(488, record_batch->num_rows());
+ ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(1512, reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_FALSE(record_batch);
+
+ // empty column indices
+ ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{0, 1},
+ /*column_indices=*/{}));
+ ASSERT_EQ(0, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+ reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+ ASSERT_OK_AND_ASSIGN(record_batch, reader_wrapper->Next());
+ ASSERT_EQ(512, record_batch->num_rows());
+ ASSERT_EQ(0, record_batch->num_columns());
+
+ // empty row group indices
+ ASSERT_OK(reader_wrapper->PrepareForReading(/*row_group_indices=*/{},
+ /*column_indices=*/{0}));
+ ASSERT_EQ(5500, reader_wrapper->GetNextRowToRead());
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max(),
+ reader_wrapper->GetPreviousBatchFirstRowNumber().value());
+}
+
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp
b/src/paimon/format/parquet/parquet_file_batch_reader.cpp
new file mode 100644
index 0000000..d7a1cde
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+
+#include <cstddef>
+#include <unordered_map>
+
+#include "arrow/acero/options.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/compute/api.h"
+#include "arrow/dataset/dataset.h"
+#include "arrow/dataset/file_base.h"
+#include "arrow/dataset/file_parquet.h"
+#include "arrow/dataset/type_fwd.h"
+#include "arrow/io/caching.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/util/range.h"
+#include "arrow/util/thread_pool.h"
+#include "fmt/format.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_timestamp_converter.h"
+#include "paimon/format/parquet/predicate_converter.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+} // namespace arrow
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+ParquetFileBatchReader::ParquetFileBatchReader(
+ std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+ std::unique_ptr<FileReaderWrapper>&& reader, const std::map<std::string,
std::string>& options,
+ const std::shared_ptr<arrow::MemoryPool>& arrow_pool)
+ : options_(options),
+ arrow_pool_(arrow_pool),
+ input_stream_(std::move(input_stream)),
+ reader_(std::move(reader)),
+ read_ranges_(reader_->GetAllRowGroupRanges()),
+ metrics_(std::make_shared<MetricsImpl>()) {}
+
+Result<std::unique_ptr<ParquetFileBatchReader>> ParquetFileBatchReader::Create(
+ std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options, int32_t batch_size) {
+ assert(input_stream);
+ PAIMON_ASSIGN_OR_RAISE(::parquet::ReaderProperties reader_properties,
+ CreateReaderProperties(pool, options));
+ PAIMON_ASSIGN_OR_RAISE(::parquet::ArrowReaderProperties
arrow_reader_properties,
+ CreateArrowReaderProperties(pool, options,
batch_size));
+
+ ::parquet::arrow::FileReaderBuilder file_reader_builder;
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.Open(input_stream,
reader_properties));
+
+ std::unique_ptr<::parquet::arrow::FileReader> file_reader;
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(file_reader_builder.memory_pool(pool.get())
+ ->properties(arrow_reader_properties)
+ ->Build(&file_reader));
+
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileReaderWrapper> reader,
+ FileReaderWrapper::Create(std::move(file_reader)));
+ auto parquet_file_batch_reader = std::unique_ptr<ParquetFileBatchReader>(
+ new ParquetFileBatchReader(std::move(input_stream), std::move(reader),
options, pool));
+ PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> file_schema,
+ parquet_file_batch_reader->GetFileSchema());
+ PAIMON_RETURN_NOT_OK(parquet_file_batch_reader->SetReadSchema(
+ file_schema.get(), /*predicate=*/nullptr,
/*selection_bitmap=*/std::nullopt));
+ return parquet_file_batch_reader;
+}
+
+Result<std::unique_ptr<::ArrowSchema>> ParquetFileBatchReader::GetFileSchema()
const {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema,
reader_->GetSchema());
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> new_schema,
+
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(file_schema));
+ PAIMON_ASSIGN_OR_RAISE(
+ std::shared_ptr<arrow::DataType> new_type,
+
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(new_schema->fields())));
+
+ auto c_schema = std::make_unique<::ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*new_type,
c_schema.get()));
+ return c_schema;
+}
+
+Status ParquetFileBatchReader::SetReadSchema(
+ ::ArrowSchema* schema, const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>& selection_bitmap) {
+ if (!schema) {
+ return Status::Invalid("SetReadSchema failed: read schema cannot be
nullptr");
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema>
read_schema,
+ arrow::ImportSchema(schema));
+
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> file_schema,
reader_->GetSchema());
+ std::unordered_map<std::string, std::vector<int32_t>> field_index_map;
+ int32_t i = 0;
+ for (const auto& field : file_schema->fields()) {
+ std::vector<int32_t> v;
+ FlattenSchema(field->type(), &i, &v);
+ field_index_map[field->name()] = v;
+ }
+
+ std::vector<int32_t> column_indices;
+ for (const auto& field : read_schema->field_names()) {
+ if (field_index_map.find(field) != field_index_map.end()) {
+ for (int32_t index : field_index_map[field]) {
+ column_indices.push_back(index);
+ }
+ } else {
+ return Status::Invalid(fmt::format("Field {} is not found in
schema.", field));
+ }
+ }
+
+ std::vector<int32_t> row_groups =
arrow::internal::Iota(reader_->GetNumberOfRowGroups());
+ if (predicate) {
+ PAIMON_ASSIGN_OR_RAISE(row_groups,
+ FilterRowGroupsByPredicate(predicate,
file_schema, row_groups));
+ }
+ if (selection_bitmap) {
+ PAIMON_ASSIGN_OR_RAISE(row_groups,
+
FilterRowGroupsByBitmap(selection_bitmap.value(), row_groups));
+ }
+
+ read_data_type_ = arrow::struct_(read_schema->fields());
+ read_row_groups_ = row_groups;
+ read_column_indices_ = column_indices;
+
+ metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_TOTAL,
reader_->GetNumberOfRowGroups());
+ metrics_->SetCounter(ParquetMetrics::READ_ROW_GROUPS_AFTER_FILTER,
row_groups.size());
+
+ PAIMON_ASSIGN_OR_RAISE(std::set<int32_t> ordered_row_groups,
+ reader_->FilterRowGroupsByReadRanges(read_ranges_,
read_row_groups_));
+ return reader_->PrepareForReadingLazy(ordered_row_groups,
read_column_indices_);
+}
+
+Result<std::vector<int32_t>>
ParquetFileBatchReader::FilterRowGroupsByPredicate(
+ const std::shared_ptr<Predicate>& predicate, const
std::shared_ptr<arrow::Schema> file_schema,
+ const std::vector<int32_t>& src_row_groups) const {
+ if (!predicate) {
+ return Status::Invalid("cannot pushdown an empty predicate");
+ }
+ // convert paimon predicate to arrow expression
+ PAIMON_ASSIGN_OR_RAISE(
+ uint32_t predicate_node_count_limit,
+ OptionsUtils::GetValueFromMap<uint32_t>(options_,
PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT,
+
DEFAULT_PARQUET_READ_PREDICATE_NODE_COUNT_LIMIT));
+ PAIMON_ASSIGN_OR_RAISE(arrow::compute::Expression expr,
+ PredicateConverter::Convert(predicate,
predicate_node_count_limit));
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::Expression bind_expr,
expr.Bind(*file_schema));
+
+ // prepare file source
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t file_length,
input_stream_->GetSize());
+ auto file_source = arrow::dataset::FileSource(input_stream_,
/*size=*/file_length);
+
+ // filter row group by arrow expression and row group meta
+ auto parquet_file_format =
std::make_shared<arrow::dataset::ParquetFileFormat>();
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::shared_ptr<arrow::dataset::ParquetFileFragment> file_fragment,
+ parquet_file_format->MakeFragment(
+ file_source,
/*partition_expression=*/PredicateConverter::AlwaysTrue(),
+ /*physical_schema=*/nullptr, /*row_groups=*/src_row_groups));
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(
+ file_fragment->EnsureCompleteMetadata(reader_->GetFileReader()));
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(arrow::dataset::FragmentVector
target_fragments,
+
file_fragment->SplitByRowGroup(bind_expr));
+ std::vector<int32_t> target_row_groups;
+ target_row_groups.reserve(src_row_groups.size());
+ for (const auto& fragment : target_fragments) {
+ auto parquet_fragment =
dynamic_cast<arrow::dataset::ParquetFileFragment*>(fragment.get());
+ if (!parquet_fragment) {
+ return Status::Invalid("cannot cast to ParquetFileFragment in
ParquetFileBatchReader");
+ }
+ target_row_groups.insert(target_row_groups.end(),
parquet_fragment->row_groups().begin(),
+ parquet_fragment->row_groups().end());
+ }
+ return target_row_groups;
+}
+
+Result<std::vector<int32_t>> ParquetFileBatchReader::FilterRowGroupsByBitmap(
+ const RoaringBitmap32& bitmap, const std::vector<int32_t>& src_row_groups)
const {
+ if (bitmap.IsEmpty()) {
+ return Status::Invalid("cannot push down an empty bitmap to
ParquetFileBatchReader");
+ }
+ const auto& all_row_group_ranges = reader_->GetAllRowGroupRanges();
+ // filter row groups by row range
+ std::vector<int32_t> target_row_groups;
+ for (const auto& row_group_idx : src_row_groups) {
+ if (static_cast<size_t>(row_group_idx) >= all_row_group_ranges.size())
{
+ return Status::Invalid(
+ fmt::format("src row group {} not in row group meta",
row_group_idx));
+ }
+ const auto& [start_row_idx, end_row_idx] =
all_row_group_ranges[row_group_idx];
+ if (bitmap.ContainsAny(start_row_idx, end_row_idx)) {
+ target_row_groups.push_back(row_group_idx);
+ }
+ }
+ return target_row_groups;
+}
+
+Result<BatchReader::ReadBatch> ParquetFileBatchReader::NextBatch() {
+ PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::RecordBatch> batch,
reader_->Next());
+ if (batch == nullptr) {
+ return BatchReader::MakeEofBatch();
+ }
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array,
batch->ToStructArray());
+ PAIMON_ASSIGN_OR_RAISE(bool need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ array->type(),
read_data_type_));
+ if (need_cast) {
+ PAIMON_ASSIGN_OR_RAISE(array,
ParquetTimestampConverter::CastArrayForTimestamp(
+ array, read_data_type_,
arrow_pool_));
+ }
+ PAIMON_ASSIGN_OR_RAISE(need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
+ array->type(), read_data_type_));
+ if (need_cast) {
+ return Status::Invalid(
+ fmt::format("unexpected: in parquet, after CastArrayForTimestamp,
output type {} not "
+ "equal with read schema {}",
+ array->type()->ToString(),
read_data_type_->ToString()));
+ }
+ std::unique_ptr<ArrowArray> c_array = std::make_unique<ArrowArray>();
+ std::unique_ptr<ArrowSchema> c_schema = std::make_unique<ArrowSchema>();
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(),
c_schema.get()));
+
+ read_rows_ += array->length();
+ read_batch_count_++;
+ metrics_->SetCounter(ParquetMetrics::READ_ROWS, read_rows_);
+ metrics_->SetCounter(ParquetMetrics::READ_BATCH_COUNT, read_batch_count_);
+
+ return make_pair(std::move(c_array), std::move(c_schema));
+}
+
+Result<std::vector<std::pair<uint64_t, uint64_t>>>
ParquetFileBatchReader::GenReadRanges(
+ bool* need_prefetch) const {
+ *need_prefetch = true;
+ return reader_->GetAllRowGroupRanges();
+}
+
+Result<::parquet::ReaderProperties>
ParquetFileBatchReader::CreateReaderProperties(
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options) {
+ ::parquet::ReaderProperties reader_properties;
+ // TODO(jinli.zjw): set more ReaderProperties (compare with java)
+ PAIMON_ASSIGN_OR_RAISE(
+ bool enable_pre_buffer,
+ OptionsUtils::GetValueFromMap<bool>(options,
PARQUET_READ_ENABLE_PRE_BUFFER, true));
+ if (enable_pre_buffer) {
+ reader_properties.enable_buffered_stream();
+ } else {
+ reader_properties.disable_buffered_stream();
+ }
+ return reader_properties;
+}
+
+Result<::parquet::ArrowReaderProperties>
ParquetFileBatchReader::CreateArrowReaderProperties(
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options, int32_t batch_size) {
+ PAIMON_ASSIGN_OR_RAISE(
+ uint32_t executor_thread_count,
+ OptionsUtils::GetValueFromMap<uint32_t>(options,
PARQUET_READ_EXECUTOR_THREAD_COUNT,
+
DEFAULT_PARQUET_READ_EXECUTOR_THREAD_COUNT));
+
+ ::parquet::ArrowReaderProperties arrow_reader_props;
+ // TODO(jinli.zjw): set more ArrowReaderProperties (compare with java)
+ PAIMON_ASSIGN_OR_RAISE(
+ bool enable_pre_buffer,
+ OptionsUtils::GetValueFromMap<bool>(options,
PARQUET_READ_ENABLE_PRE_BUFFER, true));
+ arrow_reader_props.set_pre_buffer(enable_pre_buffer);
+ arrow_reader_props.set_batch_size(static_cast<int64_t>(batch_size));
+ if (executor_thread_count != 0) {
+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::SetCpuThreadPoolCapacity(executor_thread_count));
+ arrow_reader_props.set_use_threads(true);
+ } else {
+ arrow_reader_props.set_use_threads(false);
+ }
+ PAIMON_ASSIGN_OR_RAISE(bool cache_lazy,
OptionsUtils::GetValueFromMap<bool>(
+ options,
PARQUET_READ_CACHE_OPTION_LAZY, false));
+ PAIMON_ASSIGN_OR_RAISE(
+ int64_t cache_prefetch_limit,
+ OptionsUtils::GetValueFromMap<int64_t>(options,
PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT,
+
DEFAULT_PARQUET_READ_CACHE_OPTION_PREFETCH_LIMIT));
+ PAIMON_ASSIGN_OR_RAISE(
+ int64_t cache_range_size_limit,
+ OptionsUtils::GetValueFromMap<int64_t>(options,
PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT,
+
DEFAULT_PARQUET_READ_CACHE_OPTION_RANGE_SIZE_LIMIT));
+ auto cache_option = arrow::io::CacheOptions::Defaults();
+ cache_option.lazy = cache_lazy;
+ cache_option.prefetch_limit = cache_prefetch_limit;
+ cache_option.range_size_limit = cache_range_size_limit;
+ arrow_reader_props.set_cache_options(cache_option);
+ return arrow_reader_props;
+}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.h
b/src/paimon/format/parquet/parquet_file_batch_reader.h
new file mode 100644
index 0000000..806afbe
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader.h
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cassert>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <optional>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/parquet/file_reader_wrapper.h"
+#include "paimon/reader/prefetch_file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+
+namespace io {
+class RandomAccessFile;
+} // namespace io
+} // namespace arrow
+namespace paimon {
+class Metrics;
+class Predicate;
+class RoaringBitmap32;
+} // namespace paimon
+
+namespace paimon::parquet {
+
+class ParquetFileBatchReader : public PrefetchFileBatchReader {
+ public:
+ static Result<std::unique_ptr<ParquetFileBatchReader>> Create(
+ std::shared_ptr<arrow::io::RandomAccessFile>&& input_stream,
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options, int32_t batch_size);
+
+ // For timestamp type, we return the schema stored in file, e.g., second
in parquet file will
+ // store as milli.
+ Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
+
+ Status SetReadSchema(::ArrowSchema* read_schema, const
std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>&
selection_bitmap) override;
+
+ Status SeekToRow(uint64_t row_number) override {
+ assert(reader_);
+ return reader_->SeekToRow(row_number);
+ }
+
+ // Important: output ArrowArray is allocated on arrow_pool_ whose
lifecycle holds in
+ // ParquetFileBatchReader. Therefore, we need to hold BatchReader when
using output
+ // ArrowArray.
+ Result<ReadBatch> NextBatch() override;
+
+ Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
+ bool* need_prefetch) const override;
+
+ Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+ assert(reader_);
+ return reader_->GetPreviousBatchFirstRowNumber();
+ }
+
+ Result<uint64_t> GetNumberOfRows() const override {
+ assert(reader_);
+ return reader_->GetNumberOfRows();
+ }
+
+ uint64_t GetNextRowToRead() const override {
+ assert(reader_);
+ return reader_->GetNextRowToRead();
+ }
+
+ Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>&
read_ranges) override {
+ read_ranges_ = read_ranges;
+ PAIMON_ASSIGN_OR_RAISE(
+ std::set<int32_t> ordered_row_groups,
+ reader_->FilterRowGroupsByReadRanges(read_ranges_,
read_row_groups_));
+ return reader_->PrepareForReadingLazy(ordered_row_groups,
read_column_indices_);
+ }
+
+ std::shared_ptr<Metrics> GetReaderMetrics() const override {
+ return metrics_;
+ }
+
+ void Close() override {
+ if (reader_) {
+ auto status = reader_->Close();
+ reader_.reset();
+ (void)status;
+ }
+ input_stream_.reset();
+ }
+
+ bool SupportPreciseBitmapSelection() const override {
+ return false;
+ }
+
+ private:
+ ParquetFileBatchReader(std::shared_ptr<arrow::io::RandomAccessFile>&&
input_stream,
+ std::unique_ptr<FileReaderWrapper>&& reader,
+ const std::map<std::string, std::string>& options,
+ const std::shared_ptr<arrow::MemoryPool>&
arrow_pool);
+
+ static Result<::parquet::ReaderProperties> CreateReaderProperties(
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options);
+
+ static Result<::parquet::ArrowReaderProperties>
CreateArrowReaderProperties(
+ const std::shared_ptr<arrow::MemoryPool>& pool,
+ const std::map<std::string, std::string>& options, int32_t batch_size);
+
+ static void FlattenSchema(const std::shared_ptr<arrow::DataType>& type,
int32_t* index,
+ std::vector<int32_t>* index_vector) {
+ if (type->id() == arrow::Type::STRUCT || type->id() ==
arrow::Type::LIST ||
+ type->id() == arrow::Type::MAP) {
+ for (int32_t i = 0; i < type->num_fields(); i++) {
+ auto field = type->field(i);
+ auto inner_type = field->type();
+ FlattenSchema(inner_type, index, index_vector);
+ }
+ } else {
+ index_vector->push_back((*index)++);
+ }
+ }
+
+ // precondition: predicate supposed not be empty
+ Result<std::vector<int32_t>> FilterRowGroupsByPredicate(
+ const std::shared_ptr<Predicate>& predicate,
+ const std::shared_ptr<arrow::Schema> file_schema,
+ const std::vector<int32_t>& src_row_groups) const;
+
+ Result<std::vector<int32_t>> FilterRowGroupsByBitmap(
+ const RoaringBitmap32& bitmap, const std::vector<int32_t>&
src_row_groups) const;
+
+ private:
+ std::map<std::string, std::string> options_;
+ // hold the lifecycle of arrow memory pool.
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+
+ std::shared_ptr<arrow::io::RandomAccessFile> input_stream_;
+ std::unique_ptr<FileReaderWrapper> reader_;
+
+ std::shared_ptr<arrow::DataType> read_data_type_;
+ std::vector<std::pair<uint64_t, uint64_t>> read_ranges_;
+
+ std::shared_ptr<Metrics> metrics_;
+
+ uint64_t read_rows_ = 0;
+ uint64_t read_batch_count_ = 0;
+
+ // last time set read schema
+ std::vector<int32_t> read_row_groups_;
+ std::vector<int32_t> read_column_indices_;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
new file mode 100644
index 0000000..836c291
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_file_batch_reader.h"
+
+#include <iostream>
+#include <limits>
+
+#include "arrow/api.h"
+#include "arrow/array/array_base.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/caching.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/arrow/arrow_input_stream_adapter.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/defs.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/format/parquet/parquet_format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/predicate/predicate_builder.h"
+#include "paimon/reader/batch_reader.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/testing/utils/timezone_guard.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "parquet/properties.h"
+
+namespace paimon {
+class Predicate;
+} // namespace paimon
+
+namespace paimon::parquet::test {
+
+class ParquetFileBatchReaderTest : public ::testing::Test,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ void SetUp() override {
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ fs_ = std::make_shared<LocalFileSystem>();
+ pool_ = GetArrowPool(GetDefaultPool());
+ batch_size_ = 10;
+ file_path_ = PathUtil::JoinPath(dir_->Str(), "test.parquet");
+
+ arrow::FieldVector fields = {
+ arrow::field("f1", arrow::boolean()),
+ arrow::field("f2", arrow::int8()),
+ arrow::field("f3", arrow::int16()),
+ arrow::field("f4", arrow::int32()),
+ arrow::field("f5", arrow::int64()),
+ arrow::field("f6", arrow::float32()),
+ arrow::field("f7", arrow::float64()),
+ arrow::field("f8", arrow::utf8()),
+ arrow::field("f9", arrow::binary()),
+ arrow::field("f10", arrow::map(arrow::list(arrow::float32()),
+ arrow::struct_({arrow::field("f0",
arrow::boolean()),
+ arrow::field("f1",
arrow::int64())}))),
+ arrow::field("f11", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("f12", arrow::date32()),
+ arrow::field("f13", arrow::decimal128(2, 2))};
+
+ schema_ = arrow::schema(fields);
+ struct_array_ = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields),
R"([
+[true, 31, null, 300001, 3000000001, 3.1, 300.000000001, "s31", "a31",
[[[5.11, 5.21], [true, 61]]], "1970-01-01 00:00:00.000071", 81, "0.91"],
+[false, 32, 302, 300002, 3000000002, 3.2, 300.000000002, "s32", "a32",
[[[5.12, 5.22], [false, 62]]], "1970-01-01 00:00:00.000072", 82, "0.92"],
+[true, 33, 303, 300003, 3000000003, 3.3, 300.000000003, "s33", "a33", null,
"1970-01-01 00:00:00.000073", 83, "0.93"],
+[false, 34, 304, 300004, 3000000004, 3.4, 300.000000004, "s34", "a34",
[[[5.141, 5.241], [false, 641]], [[5.14, 5.24], [false, 64]]], "1970-01-01
00:00:00.000074", 84, "0.94"],
+[true, 35, 305, 300005, 3000000005, 3.5, 300.000000005, "s35", "a35", [[[5.15,
5.25], [true, 65]]], "1970-01-01 00:00:00.000075", 85, "0.95"],
+[false, 36, 306, 300006, 3000000006, 3.6, 300.000000006, "s36", "a36",
[[[5.16, 5.26], null]], "1970-01-01 00:00:00.000076", 86, "0.96"]
+ ])")
+ .ValueOrDie());
+ }
+
+ void TearDown() override {}
+
+ void WriteArray(const std::string& file_path, const
std::shared_ptr<arrow::Array>& src_array,
+ const std::shared_ptr<arrow::Schema>& arrow_schema,
int64_t write_batch_size,
+ bool enable_dictionary, int64_t max_row_group_length)
const {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/true));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(write_batch_size);
+ builder.max_row_group_length(max_row_group_length);
+ enable_dictionary ? builder.enable_dictionary() :
builder.disable_dictionary();
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(auto format_writer, ParquetFormatWriter::Create(
+ out, arrow_schema,
writer_properties,
+
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, pool_));
+
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*src_array, arrow_array.get()).ok());
+ ASSERT_OK(format_writer->AddBatch(arrow_array.get()));
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ }
+
+ std::unique_ptr<ParquetFileBatchReader> PrepareParquetFileBatchReader(
+ const std::string& file_name, const std::shared_ptr<arrow::Schema>&
read_schema,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>& selection_bitmap, int32_t
batch_size) const {
+ EXPECT_OK_AND_ASSIGN(auto input_stream, fs_->Open(file_name));
+ auto length = fs_->GetFileStatus(file_name).value()->GetLen();
+ auto in_stream =
+ std::make_unique<ArrowInputStreamAdapter>(std::move(input_stream),
pool_, length);
+ std::map<std::string, std::string> options = {};
+ return PrepareParquetFileBatchReader(std::move(in_stream), options,
read_schema, predicate,
+ selection_bitmap, batch_size);
+ }
+
+ std::unique_ptr<paimon::parquet::ParquetFileBatchReader>
PrepareParquetFileBatchReader(
+ std::unique_ptr<arrow::io::RandomAccessFile>&& in_stream,
+ const std::map<std::string, std::string>& options,
+ const std::shared_ptr<arrow::Schema>& read_schema,
+ const std::shared_ptr<Predicate>& predicate,
+ const std::optional<RoaringBitmap32>& selection_bitmap, int32_t
batch_size) const {
+ EXPECT_OK_AND_ASSIGN(
+ auto parquet_batch_reader,
+ ParquetFileBatchReader::Create(std::move(in_stream), pool_,
options, batch_size));
+ std::unique_ptr<ArrowSchema> c_schema =
std::make_unique<ArrowSchema>();
+ auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get());
+ EXPECT_TRUE(arrow_status.ok());
+ EXPECT_OK(parquet_batch_reader->SetReadSchema(c_schema.get(),
predicate, selection_bitmap));
+ return parquet_batch_reader;
+ }
+
+ private:
+ std::string file_path_;
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<arrow::MemoryPool> pool_;
+ int32_t batch_size_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<arrow::StructArray> struct_array_;
+};
+
+TEST_F(ParquetFileBatchReaderTest, TestSimple) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"/parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_name, schema_, /*predicate=*/nullptr,
/*selection_bitmap=*/std::nullopt, batch_size_);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ std::shared_ptr<arrow::ChunkedArray> expected_array =
+ std::make_shared<arrow::ChunkedArray>(struct_array_);
+ ASSERT_TRUE(result_array->Equals(*expected_array,
+
arrow::EqualOptions::Defaults().diff_sink(&std::cout)));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestSetReadSchema) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
fs_->Open(file_name));
+ auto length = fs_->GetFileStatus(file_name).value()->GetLen();
+ auto in_stream =
+ std::make_unique<ArrowInputStreamAdapter>(std::move(input_stream),
pool_, length);
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(
+ auto parquet_batch_reader,
+ ParquetFileBatchReader::Create(std::move(in_stream), pool_, options,
batch_size_));
+ // test GetFileSchema()
+ ASSERT_OK_AND_ASSIGN(auto c_file_schema,
parquet_batch_reader->GetFileSchema());
+ auto arrow_file_schema =
arrow::ImportSchema(c_file_schema.get()).ValueOrDie();
+ ASSERT_TRUE(arrow_file_schema->Equals(*schema_));
+
+ // NextBatch() without SetReadSchema(), will return data with
+ // file schema
+ ASSERT_OK_AND_ASSIGN(
+ auto result_with_file_schema,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ auto expected_file_chunk_array =
std::make_shared<arrow::ChunkedArray>(struct_array_);
+ ASSERT_TRUE(result_with_file_schema->Equals(expected_file_chunk_array));
+ // NextBatch() with SetReadSchema(), will return data with read schema
+ arrow::Schema read_schema({schema_->field(0), schema_->field(9),
schema_->field(12)});
+ std::unique_ptr<ArrowSchema> c_read_schema =
std::make_unique<ArrowSchema>();
+ ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok());
+ ASSERT_OK(parquet_batch_reader->SetReadSchema(c_read_schema.get(),
+ /*predicate=*/nullptr,
+
/*selection_bitmap=*/std::nullopt));
+ auto expected_read_array =
+ arrow::StructArray::Make(
+ {struct_array_->field(0), struct_array_->field(9),
struct_array_->field(12)},
+ read_schema.fields())
+ .ValueOrDie();
+ auto expected_read_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+ ASSERT_OK_AND_ASSIGN(
+ auto result_with_read_schema,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ ASSERT_TRUE(result_with_read_schema->Equals(expected_read_chunk_array));
+
+ // NextBatch() with predicate
+ auto predicate = PredicateBuilder::IsNull(
+ /*field_index=*/0, /*field_name=*/"f1", FieldType::BOOLEAN);
+ c_read_schema = std::make_unique<ArrowSchema>();
+ ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok());
+ ASSERT_OK(parquet_batch_reader->SetReadSchema(c_read_schema.get(),
predicate,
+
/*selection_bitmap=*/std::nullopt));
+ ASSERT_OK_AND_ASSIGN(result_with_read_schema,
paimon::test::ReadResultCollector::CollectResult(
+
parquet_batch_reader.get()));
+ ASSERT_FALSE(result_with_read_schema);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchSimple) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ for (auto batch_size : {1, 2, 3, 5, 8, 10}) {
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_name, schema_,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
batch_size);
+
ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
+ std::numeric_limits<uint64_t>::max());
+ ASSERT_OK_AND_ASSIGN(auto result_array,
paimon::test::ReadResultCollector::CollectResult(
+
parquet_batch_reader.get()));
+
ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6);
+ parquet_batch_reader->Close();
+ auto expected_array =
std::make_shared<arrow::ChunkedArray>(struct_array_);
+ ASSERT_TRUE(result_array->Equals(expected_array));
+ // test metrics
+ auto read_metrics = parquet_batch_reader->GetReaderMetrics();
+ ASSERT_TRUE(read_metrics);
+ // TODO(jinli.zjw): test metrics
+ // ASSERT_TRUE(read_metrics->GetCounter(ParquetMetrics::READ_BYTES) >
0);
+ //
ASSERT_TRUE(read_metrics->GetCounter(ParquetMetrics::READ_RAW_BYTES) > 0);
+ }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithTargetSchema) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ auto read_schema = arrow::schema(arrow::FieldVector(
+ {schema_->field(4), schema_->field(9), schema_->field(10),
schema_->field(12)}));
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_name, read_schema,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
batch_size_);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
paimon::test::ReadResultCollector::CollectResult(
+ parquet_batch_reader.get()));
+ parquet_batch_reader->Close();
+ auto expected_read_array =
+ arrow::StructArray::Make({struct_array_->field(4),
struct_array_->field(9),
+ struct_array_->field(10),
struct_array_->field(12)},
+ read_schema->fields())
+ .ValueOrDie();
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+ ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithOutofOrderTargetSchema) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ // read with f13, f11, f10, f5
+ auto read_schema = arrow::schema(arrow::FieldVector(
+ {schema_->field(12), schema_->field(10), schema_->field(9),
schema_->field(4)}));
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_name, read_schema,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
batch_size_);
+ ASSERT_OK_AND_ASSIGN(auto result_array,
paimon::test::ReadResultCollector::CollectResult(
+ parquet_batch_reader.get()));
+ parquet_batch_reader->Close();
+ auto expected_read_array =
+ arrow::StructArray::Make({struct_array_->field(12),
struct_array_->field(10),
+ struct_array_->field(9),
struct_array_->field(4)},
+ read_schema->fields())
+ .ValueOrDie();
+ auto expected_chunk_array =
std::make_shared<arrow::ChunkedArray>(expected_read_array);
+ ASSERT_TRUE(result_array->Equals(expected_chunk_array));
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestNextBatchWithDictionary) {
+ auto f0 = arrow::field("f0", arrow::list(arrow::utf8()));
+ auto f1 = arrow::field("f1", arrow::map(arrow::utf8(), arrow::binary()));
+ auto f2 = arrow::field(
+ "f2", arrow::struct_({field("sub1", arrow::int64()), field("sub2",
arrow::binary()),
+ field("sub3", arrow::utf8())}));
+
+ arrow::FieldVector fields = {f0, f1, f2};
+ auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+ [["a", "a", "b"], [["a", "q"], ["b", "w"]], [10, "q",
"a"]],
+ [["a", "c"], [["a", "e"], ["b", "r"], ["c", "e"]], [20, "w",
"a"]],
+ [["a", "d"], [["d", "r"], ["e", "t"]], [null, "e",
"b"]],
+ [["a"], [["a", "q"]], [null, "w",
"c"]],
+ [null, [["a", "w"], ["f", "y"]], [50, "r",
null]],
+ [["a"], null, [60, "r",
"b"]],
+ [["a", "b", "e"], [["a", null], ["b", "w"]], null]
+ ])")
+ .ValueOrDie());
+ auto src_schema = arrow::schema(fields);
+ auto arrow_schema = arrow::schema(fields);
+ auto expected_array = arrow::ChunkedArray::Make({src_array}).ValueOrDie();
+
+ auto check_result = [&](bool enable_dictionary) {
+ WriteArray(file_path_, src_array, arrow_schema,
/*write_batch_size=*/1, enable_dictionary,
+ /*max_row_group_length=*/3);
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_path_, arrow_schema,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
/*batch_size=*/2);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ ASSERT_TRUE(result_array->Equals(expected_array)) <<
result_array->ToString();
+ };
+ check_result(true);
+ check_result(false);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestGetFileSchemaWithFieldId) {
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_name, schema_, /*predicate=*/nullptr,
/*selection_bitmap=*/std::nullopt, batch_size_);
+ ASSERT_OK_AND_ASSIGN(auto c_file_schema,
parquet_batch_reader->GetFileSchema());
+ auto arrow_file_schema =
arrow::ImportSchema(c_file_schema.get()).ValueOr(nullptr);
+ ASSERT_TRUE(arrow_file_schema);
+ ASSERT_OK_AND_ASSIGN(auto data_fields,
+
DataField::ConvertArrowSchemaToDataFields(arrow_file_schema));
+
+ auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField(
+ DataField(536880129, arrow::field("item", arrow::float32()))));
+ std::vector<DataField> struct_fields = {DataField(10, arrow::field("f0",
arrow::boolean())),
+ DataField(11, arrow::field("f1",
arrow::int64()))};
+ auto struct_type =
DataField::ConvertDataFieldsToArrowStructType(struct_fields);
+ auto map_type = arrow::map(list_type, struct_type);
+
+ std::vector<DataField> expected_data_fields = {
+ DataField(0, arrow::field("f1", arrow::boolean())),
+ DataField(1, arrow::field("f2", arrow::int8())),
+ DataField(2, arrow::field("f3", arrow::int16())),
+ DataField(3, arrow::field("f4", arrow::int32())),
+ DataField(4, arrow::field("f5", arrow::int64())),
+ DataField(5, arrow::field("f6", arrow::float32())),
+ DataField(6, arrow::field("f7", arrow::float64())),
+ DataField(7, arrow::field("f8", arrow::utf8())),
+ DataField(8, arrow::field("f9", arrow::binary())),
+ DataField(9, arrow::field("f10", map_type)),
+ DataField(12, arrow::field("f11",
arrow::timestamp(arrow::TimeUnit::NANO))),
+ DataField(13, arrow::field("f12", arrow::date32())),
+ DataField(14, arrow::field("f13", arrow::decimal128(2, 2))),
+ };
+ ASSERT_EQ(data_fields, expected_data_fields);
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestCreateReaderProperties) {
+ {
+ // test default options
+ std::map<std::string, std::string> options;
+ ASSERT_OK_AND_ASSIGN(auto reader_properties,
+
ParquetFileBatchReader::CreateReaderProperties(pool_, options));
+ ASSERT_EQ(reader_properties.is_buffered_stream_enabled(), true);
+ }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestCreateArrowReaderProperties) {
+ {
+ // test default options
+ std::map<std::string, std::string> options;
+ int32_t batch_size = 1024;
+ ASSERT_OK_AND_ASSIGN(
+ auto arrow_reader_properties,
+ ParquetFileBatchReader::CreateArrowReaderProperties(pool_,
options, batch_size));
+ ASSERT_EQ(arrow_reader_properties.pre_buffer(), true);
+ ASSERT_EQ(arrow_reader_properties.batch_size(), 1024);
+ ASSERT_EQ(arrow_reader_properties.use_threads(), true);
+ ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 3);
+ ASSERT_EQ(arrow_reader_properties.cache_options(),
arrow::io::CacheOptions::Defaults());
+ }
+ {
+ std::map<std::string, std::string> options =
{{PARQUET_READ_EXECUTOR_THREAD_COUNT, "0"}};
+ int32_t batch_size = 1024;
+ ASSERT_OK_AND_ASSIGN(
+ auto arrow_reader_properties,
+ ParquetFileBatchReader::CreateArrowReaderProperties(pool_,
options, batch_size));
+ ASSERT_EQ(arrow_reader_properties.use_threads(), false);
+ }
+ {
+ std::map<std::string, std::string> options =
{{PARQUET_READ_EXECUTOR_THREAD_COUNT, "6"}};
+ int32_t batch_size = 1024;
+ ASSERT_OK_AND_ASSIGN(
+ auto arrow_reader_properties,
+ ParquetFileBatchReader::CreateArrowReaderProperties(pool_,
options, batch_size));
+ ASSERT_EQ(arrow_reader_properties.use_threads(), true);
+ ASSERT_EQ(arrow::GetCpuThreadPoolCapacity(), 6);
+ }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestBitmapPushDownWithMultiRowGroups) {
+ arrow::FieldVector fields = {arrow::field("f0", arrow::int32())};
+ auto arrow_type = arrow::struct_(fields);
+ auto src_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([
+ [0],
+ [1],
+ [2],
+ [3],
+ [4],
+ [5],
+ [6],
+ [7],
+ [8],
+ [9],
+ [10],
+ [11]
+ ])")
+ .ValueOrDie());
+ auto src_schema = arrow::schema(fields);
+ std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({3, 5});
+ // data in file rowGroup0:[0, 1, 2, 3, 4, 5] | rowGroup1:[6, 7, 8, 9, 10,
11]
+
+ auto arrow_schema = arrow::schema(fields);
+ WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/12,
+ /*enable_dictionary=*/true,
+ /*max_row_group_length=*/6);
+
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_path_, arrow_schema, /*predicate=*/nullptr, bitmap,
/*batch_size=*/12);
+
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+
+ auto expected_array = arrow::ChunkedArray::Make({src_array->Slice(0,
6)}).ValueOrDie();
+ ASSERT_TRUE(result_array->Equals(expected_array)) <<
result_array->ToString();
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestPredicateAndBitmapPushDown) {
+ arrow::FieldVector fields = {arrow::field("f0", arrow::int32())};
+ auto arrow_type = arrow::struct_(fields);
+ arrow::StructBuilder struct_builder(arrow_type,
arrow::default_memory_pool(),
+
{std::make_shared<arrow::Int32Builder>()});
+ auto int_builder =
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(0));
+ int32_t length = 1024;
+ for (int32_t i = 0; i < length; ++i) {
+ ASSERT_TRUE(struct_builder.Append().ok());
+ ASSERT_TRUE(int_builder->Append(i).ok());
+ }
+ // data file:
+ // rowGroup0: [0, 256)
+ // rowGroup1: [256, 512)
+ // rowGroup2: [512, 768)
+ // rowGroup3: [768, 1024)
+ std::shared_ptr<arrow::Array> src_array;
+ ASSERT_TRUE(struct_builder.Finish(&src_array).ok());
+ auto src_schema = arrow::schema(fields);
+ auto arrow_schema = arrow::schema(fields);
+ WriteArray(file_path_, src_array, arrow_schema, /*write_batch_size=*/1024,
+ /*enable_dictionary=*/true,
+ /*max_row_group_length=*/256);
+ {
+ // simple case
+ std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({100,
400, 600});
+ ASSERT_OK_AND_ASSIGN(
+ auto predicate,
+ PredicateBuilder::Or(
+ {PredicateBuilder::LessThan(/*field_index=*/0,
/*field_name=*/"f0", FieldType::INT,
+ Literal(255)),
+ PredicateBuilder::GreaterThan(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::INT,
Literal(600))}));
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_path_, arrow_schema, predicate, bitmap,
/*batch_size=*/length);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+
+ auto expected_array =
+ arrow::ChunkedArray::Make({src_array->Slice(0, 256),
src_array->Slice(512, 256)})
+ .ValueOrDie();
+ ASSERT_TRUE(result_array->Equals(expected_array)) <<
result_array->ToString();
+ }
+ {
+ // test all data has been filtered out with predicate and bitmap
pushdown
+ std::optional<RoaringBitmap32> bitmap = RoaringBitmap32::From({100,
400, 600});
+ auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0,
/*field_name=*/"f0",
+ FieldType::INT,
Literal(800));
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_path_, arrow_schema, predicate, bitmap,
/*batch_size=*/length);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ ASSERT_FALSE(result_array);
+ }
+}
+
+TEST_F(ParquetFileBatchReaderTest, TestReadNoField) {
+ // if only read partition fields, format reader will set empty read schema
+ std::string file_name = paimon::test::GetDataDir() +
+
"parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
+
"data-9ea62f34-1dca-49c1-bf7a-d37303d8fb76-0.parquet";
+ // read no field
+ auto read_schema = arrow::schema(arrow::FieldVector());
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_name, read_schema,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
/*batch_size=*/2);
+ // read 2 rows
+ ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
+ std::numeric_limits<uint64_t>::max());
+ ASSERT_OK_AND_ASSIGN(auto batch1, parquet_batch_reader->NextBatch());
+ ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
0);
+ // read 2 rows
+ ASSERT_OK_AND_ASSIGN(auto batch2, parquet_batch_reader->NextBatch());
+ ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
2);
+ // read 2 rows
+ ASSERT_OK_AND_ASSIGN(auto batch3, parquet_batch_reader->NextBatch());
+ ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
4);
+ // read rows with eof
+ ASSERT_OK_AND_ASSIGN(auto batch4, parquet_batch_reader->NextBatch());
+ ASSERT_EQ(parquet_batch_reader->GetPreviousBatchFirstRowNumber().value(),
6);
+ ASSERT_TRUE(BatchReader::IsEofBatch(batch4));
+ parquet_batch_reader->Close();
+
+ arrow::FieldVector fields;
+ auto arrow_type = arrow::struct_(fields);
+ auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([
+[],
+[]
+ ])")
+ .ValueOrDie());
+
+ auto check_batch = [](BatchReader::ReadBatch&& result_batch,
+ const std::shared_ptr<arrow::Array>& expected_array)
{
+ auto& [c_array, c_schema] = result_batch;
+ auto result_array = arrow::ImportArray(c_array.get(),
c_schema.get()).ValueOr(nullptr);
+ ASSERT_TRUE(result_array);
+ ASSERT_TRUE(result_array->Equals(expected_array));
+ };
+
+ check_batch(std::move(batch1), expected_array);
+ check_batch(std::move(batch2), expected_array);
+ check_batch(std::move(batch3), expected_array);
+}
+
+TEST_P(ParquetFileBatchReaderTest, TestTimestampType) {
+ auto enable_tz = GetParam();
+ std::string timezone_str = enable_tz ? "Asia/Tokyo" :
DateTimeUtils::GetLocalTimezoneName();
+ paimon::test::TimezoneGuard tz_guard(timezone_str);
+
+ arrow::FieldVector fields = {
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND,
timezone_str)),
+ arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI,
timezone_str)),
+ arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO,
timezone_str)),
+ arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO,
timezone_str)),
+ };
+
+ auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01
00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02",
"1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01
00:00:00.000000002"],
+["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01
00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004",
"1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"],
+["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01
00:00:06", null, "1970-01-01 00:00:00.000006", null]
+ ])")
+ .ValueOrDie());
+ std::shared_ptr<arrow::ChunkedArray> expected_array =
+ std::make_shared<arrow::ChunkedArray>(array);
+
+ {
+ // read data generated by Java Paimon
+ std::string file_name = paimon::test::GetDataDir() +
+
"/parquet/append_with_multiple_ts_precision_and_timezone.db/"
+
"append_with_multiple_ts_precision_and_timezone/bucket-0/"
+
"data-9b8abdde-df4d-4655-bb4c-ffda164ef9d4-0.parquet";
+ auto parquet_batch_reader = PrepareParquetFileBatchReader(
+ file_name, std::make_shared<arrow::Schema>(fields),
+ /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt,
batch_size_);
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ ASSERT_TRUE(result_array->Equals(*expected_array)) <<
result_array->ToString();
+ }
+ {
+ // read data generated by C++ Paimon
+ auto arrow_schema = std::make_shared<arrow::Schema>(fields);
+ WriteArray(file_path_, array, arrow_schema,
+ /*write_batch_size=*/1, /*enable_dictionary=*/true,
+ /*max_row_group_length=*/1);
+ auto parquet_batch_reader =
+ PrepareParquetFileBatchReader(file_path_, arrow_schema,
/*predicate=*/nullptr,
+ /*selection_bitmap=*/std::nullopt,
/*batch_size=*/2);
+
+ // check file schema
+ ASSERT_OK_AND_ASSIGN(auto c_file_schema,
parquet_batch_reader->GetFileSchema());
+ auto result_file_schema =
arrow::ImportSchema(c_file_schema.get()).ValueOr(nullptr);
+ ASSERT_TRUE(result_file_schema);
+
+ arrow::FieldVector expected_fields = {
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::MILLI,
timezone_str)),
+ arrow::field("ts_tz_milli",
arrow::timestamp(arrow::TimeUnit::MILLI, timezone_str)),
+ arrow::field("ts_tz_micro",
arrow::timestamp(arrow::TimeUnit::MICRO, timezone_str)),
+ arrow::field("ts_tz_nano",
arrow::timestamp(arrow::TimeUnit::NANO)),
+ };
+ auto expected_file_schema = arrow::schema(expected_fields);
+ ASSERT_TRUE(result_file_schema->Equals(expected_file_schema))
+ << result_file_schema->ToString();
+
+ // check array
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<arrow::ChunkedArray> result_array,
+
paimon::test::ReadResultCollector::CollectResult(parquet_batch_reader.get()));
+ ASSERT_TRUE(result_array->Equals(expected_array)) <<
result_array->ToString();
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(TestParam, ParquetFileBatchReaderTest,
::testing::Values(false, true));
+
+} // namespace paimon::parquet::test
diff --git a/src/paimon/format/parquet/parquet_format_writer.cpp
b/src/paimon/format/parquet/parquet_format_writer.cpp
new file mode 100644
index 0000000..2007948
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer.cpp
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_format_writer.h"
+
+#include <string>
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/common/utils/arrow/arrow_output_stream_adapter.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/properties.h"
+
+namespace arrow {
+class MemoryPool;
+class Schema;
+} // namespace arrow
+namespace paimon {
+class OutputStream;
+} // namespace paimon
+struct ArrowArray;
+
+namespace paimon::parquet {
+
+Result<std::unique_ptr<ParquetFormatWriter>> ParquetFormatWriter::Create(
+ const std::shared_ptr<OutputStream>& output_stream,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<::parquet::WriterProperties>& writer_properties,
uint64_t max_memory_use,
+ const std::shared_ptr<arrow::MemoryPool>& pool) {
+ auto out = std::make_shared<ArrowOutputStreamAdapter>(output_stream);
+ ::parquet::ArrowWriterProperties::Builder arrow_properties_builder;
+ auto arrow_writer_properties =
+ arrow_properties_builder.enable_deprecated_int96_timestamps()->build();
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+ std::unique_ptr<::parquet::arrow::FileWriter> file_writer,
+ ::parquet::arrow::FileWriter::Open(*schema, pool.get(), out,
writer_properties,
+ arrow_writer_properties));
+ return std::unique_ptr<ParquetFormatWriter>(
+ new ParquetFormatWriter(std::move(file_writer), out, schema,
max_memory_use, pool));
+}
+
+Status ParquetFormatWriter::AddBatch(ArrowArray* batch) {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<::arrow::RecordBatch>
record_batch,
+ arrow::ImportRecordBatch(batch,
schema_));
+ if (static_cast<uint64_t>(pool_->bytes_allocated()) > max_memory_use_) {
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->NewBufferedRowGroup());
+ }
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->WriteRecordBatch(*record_batch));
+ total_records_written_ += (*record_batch).num_rows();
+ return Status::OK();
+}
+
+Status ParquetFormatWriter::Flush() {
+ metrics_->SetCounter(ParquetMetrics::WRITE_RECORD_COUNT,
total_records_written_);
+ return Status::OK();
+}
+
+Status ParquetFormatWriter::Finish() {
+ PAIMON_RETURN_NOT_OK(Flush());
+ PAIMON_RETURN_NOT_OK_FROM_ARROW(writer_->Close());
+ return Status::OK();
+}
+
+Result<bool> ParquetFormatWriter::ReachTargetSize(bool suggested_check,
int64_t target_size) const {
+ if (suggested_check) {
+ PAIMON_ASSIGN_OR_RAISE(const uint64_t length, GetEstimateLength());
+ return length >= static_cast<uint64_t>(target_size);
+ }
+ return false;
+}
+
+Result<uint64_t> ParquetFormatWriter::GetEstimateLength() const {
+ PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(int64_t written_bytes, out_->Tell());
+ return writer_->GetBufferedSize() + written_bytes;
+}
+
+ParquetFormatWriter::ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter>
writer,
+ const
std::shared_ptr<ArrowOutputStreamAdapter>& out,
+ const std::shared_ptr<arrow::Schema>&
schema,
+ uint64_t max_memory_use,
+ const
std::shared_ptr<arrow::MemoryPool>& pool)
+ : pool_(pool),
+ out_(out),
+ writer_(std::move(writer)),
+ schema_(schema),
+ metrics_(std::make_shared<MetricsImpl>()),
+ max_memory_use_(max_memory_use) {}
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_writer.h
b/src/paimon/format/parquet/parquet_format_writer.h
new file mode 100644
index 0000000..e5cae16
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "paimon/format/format_writer.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/metrics.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "parquet/arrow/writer.h"
+
+namespace arrow {
+class MemoryPool;
+class Schema;
+} // namespace arrow
+namespace paimon {
+class Metrics;
+class OutputStream;
+class ArrowOutputStreamAdapter;
+} // namespace paimon
+namespace parquet {
+class WriterProperties;
+} // namespace parquet
+struct ArrowArray;
+
+namespace paimon::parquet {
+
+class ParquetFormatWriter : public FormatWriter {
+ public:
+ static Result<std::unique_ptr<ParquetFormatWriter>> Create(
+ const std::shared_ptr<OutputStream>& output_stream,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<::parquet::WriterProperties>& writer_properties,
+ uint64_t max_memory_use, const std::shared_ptr<arrow::MemoryPool>&
pool);
+
+ Status AddBatch(ArrowArray* batch) override;
+
+ Status Flush() override;
+
+ Status Finish() override;
+
+ Result<bool> ReachTargetSize(bool suggested_check, int64_t target_size)
const override;
+
+ std::shared_ptr<Metrics> GetWriterMetrics() const override {
+ return metrics_;
+ }
+
+ private:
+ ParquetFormatWriter(std::unique_ptr<::parquet::arrow::FileWriter> writer,
+ const std::shared_ptr<ArrowOutputStreamAdapter>& out,
+ const std::shared_ptr<arrow::Schema>& schema, uint64_t
max_memory_use,
+ const std::shared_ptr<arrow::MemoryPool>& pool);
+
+ Result<uint64_t> GetEstimateLength() const;
+
+ std::shared_ptr<arrow::MemoryPool> pool_;
+ std::shared_ptr<ArrowOutputStreamAdapter> out_;
+ std::unique_ptr<::parquet::arrow::FileWriter> writer_;
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<Metrics> metrics_;
+ int64_t total_records_written_ = 0;
+ uint64_t max_memory_use_;
+};
+
+} // namespace paimon::parquet
diff --git a/src/paimon/format/parquet/parquet_format_writer_test.cpp
b/src/paimon/format/parquet/parquet_format_writer_test.cpp
new file mode 100644
index 0000000..ccd0868
--- /dev/null
+++ b/src/paimon/format/parquet/parquet_format_writer_test.cpp
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/format/parquet/parquet_format_writer.h"
+
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_binary.h"
+#include "arrow/array/array_primitive.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/io/file.h"
+#include "arrow/ipc/api.h"
+#include "arrow/memory_pool.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/mem_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/format/file_format.h"
+#include "paimon/format/file_format_factory.h"
+#include "paimon/format/parquet/parquet_field_id_converter.h"
+#include "paimon/format/parquet/parquet_format_defs.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/metrics.h"
+#include "paimon/record_batch.h"
+#include "paimon/testing/utils/testharness.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+
+namespace arrow {
+class Array;
+} // namespace arrow
+
+namespace paimon::parquet::test {
+
+class ParquetFormatWriterTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ dir_ = paimon::test::UniqueTestDirectory::Create();
+ ASSERT_TRUE(dir_);
+ fs_ = std::make_shared<LocalFileSystem>();
+ pool_ = GetDefaultPool();
+ arrow_pool_ = GetArrowPool(pool_);
+ }
+ void TearDown() override {}
+
+ std::pair<std::shared_ptr<arrow::Schema>,
std::shared_ptr<arrow::DataType>> PrepareArrowSchema()
+ const {
+ auto string_field = arrow::field(
+ "col1", arrow::utf8(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"0"}));
+ auto int_field = arrow::field(
+ "col2", arrow::int32(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"1"}));
+ auto bool_field = arrow::field(
+ "col3", arrow::boolean(),
+
arrow::KeyValueMetadata::Make({ParquetFieldIdConverter::PARQUET_FIELD_ID},
{"2"}));
+ auto struct_type = arrow::struct_({string_field, int_field,
bool_field});
+ return std::make_pair(
+ arrow::schema(arrow::FieldVector({string_field, int_field,
bool_field})), struct_type);
+ }
+
+ std::shared_ptr<arrow::Array> PrepareArray(const
std::shared_ptr<arrow::DataType>& data_type,
+ int32_t record_batch_size,
int32_t offset = 0,
+ bool all_null_value = false)
const {
+ arrow::StructBuilder struct_builder(
+ data_type, arrow::default_memory_pool(),
+ {std::make_shared<arrow::StringBuilder>(),
std::make_shared<arrow::Int32Builder>(),
+ std::make_shared<arrow::BooleanBuilder>()});
+ auto string_builder =
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+ auto int_builder =
static_cast<arrow::Int32Builder*>(struct_builder.field_builder(1));
+ auto bool_builder =
static_cast<arrow::BooleanBuilder*>(struct_builder.field_builder(2));
+ for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) {
+ EXPECT_TRUE(struct_builder.Append().ok());
+ if (all_null_value) {
+ EXPECT_TRUE(string_builder->AppendNull().ok());
+ EXPECT_TRUE(int_builder->AppendNull().ok());
+ EXPECT_TRUE(bool_builder->AppendNull().ok());
+ } else {
+ EXPECT_TRUE(string_builder->Append("str_" +
std::to_string(i)).ok());
+ if (i % 3 == 0) {
+ // test null
+ EXPECT_TRUE(int_builder->AppendNull().ok());
+ } else {
+ EXPECT_TRUE(int_builder->Append(i).ok());
+ }
+ EXPECT_TRUE(bool_builder->Append(static_cast<bool>(i %
2)).ok());
+ }
+ }
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(struct_builder.Finish(&array).ok());
+ return array;
+ }
+
+ void AddRecordBatchOnce(const std::shared_ptr<FormatWriter>& format_writer,
+ const std::shared_ptr<arrow::DataType>&
struct_type,
+ int32_t record_batch_size, int32_t offset,
+ bool all_null_value = false) const {
+ auto array = PrepareArray(struct_type, record_batch_size, offset,
all_null_value);
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+ auto batch = std::make_shared<RecordBatch>(
+ /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+ /*row_kinds=*/std::vector<RecordBatch::RowKind>(),
arrow_array.get());
+ ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+ }
+
+ void CheckResult(const std::string& file_path, int32_t row_count,
+ int32_t row_group_count) const {
+ auto file = arrow::io::ReadableFile::Open(file_path,
arrow_pool_.get());
+ ASSERT_TRUE(file.ok());
+ std::unique_ptr<::parquet::arrow::FileReader> reader;
+ auto status = ::parquet::arrow::OpenFile(file.ValueOrDie(),
arrow_pool_.get(), &reader);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+ const ::parquet::FileMetaData* metadata =
reader->parquet_reader()->metadata().get();
+ const ::parquet::SchemaDescriptor* schema = metadata->schema();
+ ASSERT_EQ(metadata->num_row_groups(), row_group_count);
+ ASSERT_EQ(schema->num_columns(), 3);
+ ASSERT_EQ(metadata->num_rows(), row_count);
+ ASSERT_EQ("col1", schema->Column(0)->name());
+ ASSERT_EQ("col2", schema->Column(1)->name());
+ ASSERT_EQ("col3", schema->Column(2)->name());
+ ASSERT_EQ(0, schema->Column(0)->schema_node()->field_id());
+ ASSERT_EQ(1, schema->Column(1)->schema_node()->field_id());
+ ASSERT_EQ(2, schema->Column(2)->schema_node()->field_id());
+
+ std::shared_ptr<::arrow::ChunkedArray> col0_array, col1_array,
col2_array;
+ ASSERT_TRUE(reader->ReadColumn(0, &col0_array).ok());
+ ASSERT_TRUE(reader->ReadColumn(1, &col1_array).ok());
+ ASSERT_TRUE(reader->ReadColumn(2, &col2_array).ok());
+
+ const auto& string_array =
+ std::static_pointer_cast<arrow::StringArray>(col0_array->chunk(0));
+ ASSERT_TRUE(string_array);
+ const auto& int_array =
std::static_pointer_cast<arrow::Int32Array>(col1_array->chunk(0));
+ ASSERT_TRUE(int_array);
+ const auto& bool_array =
+
std::static_pointer_cast<arrow::BooleanArray>(col2_array->chunk(0));
+ ASSERT_TRUE(bool_array);
+ ASSERT_EQ(string_array->null_count(), 0);
+ ASSERT_EQ(int_array->null_count(), (row_count - 1) / 3 + 1);
+ ASSERT_EQ(bool_array->null_count(), 0);
+
+ for (int32_t i = 0; i < row_count; i++) {
+ ASSERT_EQ("str_" + std::to_string(i), string_array->GetString(i));
+ if (i % 3 == 0) {
+ ASSERT_TRUE(int_array->IsNull(i));
+ } else {
+ ASSERT_FALSE(int_array->IsNull(i));
+ ASSERT_EQ(i, int_array->Value(i));
+ }
+ if (i % 2 == 0) {
+ ASSERT_EQ(false, bool_array->Value(i));
+ } else {
+ ASSERT_EQ(true, bool_array->Value(i));
+ }
+ }
+ }
+
+ private:
+ std::unique_ptr<paimon::test::UniqueTestDirectory> dir_;
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<MemoryPool> pool_;
+ std::shared_ptr<arrow::MemoryPool> arrow_pool_;
+};
+
+TEST_F(ParquetFormatWriterTest, TestWriteWithVariousBatchSize) {
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+ std::map<std::string, std::string> options;
+ for (auto record_batch_size : {1, 2, 3, 5, 20}) {
+ for (auto batch_capacity : {1, 2, 3, 5, 20}) {
+ std::string file_name =
+ std::to_string(record_batch_size) + "_" +
std::to_string(batch_capacity);
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(batch_capacity);
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ auto format_writer,
+ ParquetFormatWriter::Create(out, arrow_schema,
writer_properties,
+
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, arrow_pool_));
+ auto array = PrepareArray(struct_type, record_batch_size);
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+
+ auto batch = std::make_shared<RecordBatch>(
+ /*partition=*/std::map<std::string, std::string>(),
/*bucket=*/-1,
+ /*row_kinds=*/std::vector<RecordBatch::RowKind>(),
arrow_array.get());
+ ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ CheckResult(file_path, record_batch_size, /*row_group_count=*/1);
+ }
+ }
+}
+
+TEST_F(ParquetFormatWriterTest, TestWriteWithV1Version) {
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+ std::map<std::string, std::string> options;
+ auto record_batch_size = 10;
+ auto batch_capacity = 5;
+ std::string file_name = "test.parquet";
+ std::string file_path = PathUtil::JoinPath(dir_->Str(), file_name);
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(batch_capacity);
+ builder.version(::parquet::ParquetVersion::type::PARQUET_1_0);
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ auto format_writer,
+ ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+ auto array = PrepareArray(struct_type, record_batch_size);
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+
+ auto batch = std::make_shared<RecordBatch>(
+ /*partition=*/std::map<std::string, std::string>(), /*bucket=*/-1,
+ /*row_kinds=*/std::vector<RecordBatch::RowKind>(), arrow_array.get());
+ ASSERT_OK(format_writer->AddBatch(batch->GetData()));
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ CheckResult(file_path, record_batch_size, /*row_group_count=*/1);
+}
+
+TEST_F(ParquetFormatWriterTest, TestWriteMultipleTimes) {
+ // arrow array length = 6 + 10 + 15 + 6 = 37
+ // parquet batch capacity = 10
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+
+ std::string file_path = PathUtil::JoinPath(dir_->Str(),
"write_multiple_times");
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ builder.write_batch_size(10);
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ParquetFormatWriter> format_writer,
+ ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+
+ // add batch first time, 6 rows
+ AddRecordBatchOnce(format_writer, struct_type, 6, 0);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len1,
format_writer->GetEstimateLength());
+ ASSERT_GT(estimate_len1, 0);
+
+ // add batch second times, 10 rows
+ AddRecordBatchOnce(format_writer, struct_type, 10, 6);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len2,
format_writer->GetEstimateLength());
+ ASSERT_EQ(estimate_len2, estimate_len1);
+
+ // add batch third times, 15 rows (expand internal batch)
+ AddRecordBatchOnce(format_writer, struct_type, 15, 16);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len3,
format_writer->GetEstimateLength());
+ ASSERT_EQ(estimate_len3, estimate_len2);
+
+ // add batch fourth times, 6 rows
+ AddRecordBatchOnce(format_writer, struct_type, 6, 31);
+
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ CheckResult(file_path, /*row_count=*/37, /*row_group_count=*/1);
+ auto metrics = format_writer->GetWriterMetrics();
+ ASSERT_OK_AND_ASSIGN(uint64_t counter,
metrics->GetCounter(ParquetMetrics::WRITE_RECORD_COUNT));
+ ASSERT_EQ(37, counter);
+}
+
+TEST_F(ParquetFormatWriterTest, TestGetEstimateLength) {
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+
+ std::string file_path = PathUtil::JoinPath(dir_->Str(),
"get_estimate_length");
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ::parquet::WriterProperties::Builder builder;
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ParquetFormatWriter> format_writer,
+ ParquetFormatWriter::Create(out, arrow_schema, writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+
+ // add batch first time, 1 row
+ AddRecordBatchOnce(format_writer, struct_type, 1, 0);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len1,
format_writer->GetEstimateLength());
+ ASSERT_GT(estimate_len1, 0);
+
+ // add batch second times, 9998 rows
+ AddRecordBatchOnce(format_writer, struct_type, 9998, 1);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len2,
format_writer->GetEstimateLength());
+ ASSERT_EQ(estimate_len2, estimate_len1);
+
+ AddRecordBatchOnce(format_writer, struct_type, 100000, 9999);
+ ASSERT_OK_AND_ASSIGN(uint64_t estimate_len3,
format_writer->GetEstimateLength());
+ ASSERT_GT(estimate_len3, estimate_len2);
+ ASSERT_OK(format_writer->Finish());
+}
+
+TEST_F(ParquetFormatWriterTest, TestMemoryControl) {
+ auto run = [&](bool all_null_value, uint64_t max_memory_use) {
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<FileFormat> file_format,
+ FileFormatFactory::Get(
+ "parquet", {{Options::FILE_FORMAT, "parquet"},
+ {Options::MANIFEST_FORMAT, "parquet"},
+ {"parquet.writer.max.memory.use",
std::to_string(max_memory_use)}}));
+
+ std::shared_ptr<MemoryPool> pool = GetMemoryPool();
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+ int32_t batch_size = 4096;
+
+ auto c_schema = std::make_unique<::ArrowSchema>();
+ ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok());
+ ASSERT_OK_AND_ASSIGN(auto writer_builder,
+ file_format->CreateWriterBuilder(c_schema.get(),
batch_size));
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<OutputStream> out,
+ fs_->Create(
+ PathUtil::JoinPath(dir_->Str(), std::to_string(all_null_value)
+
+
std::to_string(max_memory_use) + ".parquet"),
+ /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FormatWriter> writer,
+ writer_builder->WithMemoryPool(pool)->Build(out,
"uncompressed"));
+
+ auto array = PrepareArray(struct_type, batch_size, /*offset=*/0,
all_null_value);
+ for (int32_t i = 0; i < 2000; ++i) {
+ auto arrow_array = std::make_unique<ArrowArray>();
+ ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok());
+ auto batch = std::make_shared<RecordBatch>(
+ /*partition=*/std::map<std::string, std::string>(),
/*bucket=*/-1,
+ /*row_kinds=*/std::vector<RecordBatch::RowKind>(),
arrow_array.get());
+ ASSERT_OK(writer->AddBatch(batch->GetData()));
+ ASSERT_OK(writer->Flush());
+ }
+
+ ASSERT_OK(writer->Flush());
+ ASSERT_OK(writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ uint64_t actual_max_mem = pool->MaxMemoryUsage();
+ ASSERT_GT(actual_max_mem, max_memory_use);
+ ASSERT_LT(actual_max_mem, max_memory_use * 1.5); // allow 50% overhead
+ };
+ run(/*all_null_value=*/true, /*max_memory_use=*/20 * 1024 * 1024); //
20MB
+ run(/*all_null_value=*/true, /*max_memory_use=*/40 * 1024 * 1024); //
40MB
+ run(/*all_null_value=*/false, /*max_memory_use=*/20 * 1024 * 1024); //
20MB
+ run(/*all_null_value=*/false, /*max_memory_use=*/40 * 1024 * 1024); //
40MB
+}
+
+TEST_F(ParquetFormatWriterTest, TestMemoryControlForCheckRowGroupCount) {
+ auto run = [&](int32_t write_times) {
+ ASSERT_OK_AND_ASSIGN(
+ std::unique_ptr<FileFormat> file_format,
+ FileFormatFactory::Get("parquet", {{Options::FILE_FORMAT,
"parquet"},
+ {Options::MANIFEST_FORMAT,
"parquet"},
+
{"parquet.writer.max.memory.use", "1"}}));
+
+ auto schema_pair = PrepareArrowSchema();
+ const auto& arrow_schema = schema_pair.first;
+ const auto& struct_type = schema_pair.second;
+ int32_t batch_size = 4096;
+ std::string file_path =
+ PathUtil::JoinPath(dir_->Str(), std::to_string(write_times) +
".parquet");
+
+ auto c_schema = std::make_unique<::ArrowSchema>();
+ ASSERT_TRUE(arrow::ExportSchema(*arrow_schema, c_schema.get()).ok());
+ ASSERT_OK_AND_ASSIGN(auto writer_builder,
+ file_format->CreateWriterBuilder(c_schema.get(),
batch_size));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/false));
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<FormatWriter> writer,
+ writer_builder->Build(out, "uncompressed"));
+
+ for (int32_t i = 0; i < write_times; ++i) {
+ AddRecordBatchOnce(writer, struct_type, 10, i * 10);
+ }
+
+ ASSERT_OK(writer->Flush());
+ ASSERT_OK(writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+ CheckResult(file_path, /*row_count=*/write_times * 10,
/*row_group_count=*/write_times);
+ };
+ run(/*write_times=*/1);
+ run(/*write_times=*/2);
+ run(/*write_times=*/5);
+}
+
+TEST_F(ParquetFormatWriterTest, TestTimestampType) {
+ auto timezone = DateTimeUtils::GetLocalTimezoneName();
+ arrow::FieldVector fields = {
+ arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)),
+ arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)),
+ arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)),
+ arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)),
+ arrow::field("ts_utc1", arrow::timestamp(arrow::TimeUnit::SECOND,
timezone)),
+ arrow::field("ts_utc2", arrow::timestamp(arrow::TimeUnit::MICRO,
timezone))};
+
+ std::string file_path = PathUtil::JoinPath(dir_->Str(),
"timezone.parquet");
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
+ fs_->Create(file_path, /*overwrite=*/true));
+ ::parquet::WriterProperties::Builder builder;
+ auto writer_properties = builder.build();
+ ASSERT_OK_AND_ASSIGN(
+ std::shared_ptr<ParquetFormatWriter> format_writer,
+ ParquetFormatWriter::Create(out,
std::make_shared<arrow::Schema>(fields), writer_properties,
+ DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE,
arrow_pool_));
+
+ auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+ arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
+["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01
00:00:00.000001", "1970-01-01 00:00:00.000000001",
+"1970-01-01 00:00:02", "1970-01-01 00:00:00.002"],
+["1970-01-01 00:00:01", null, "1970-01-01 00:00:00.000001", null,"1970-01-01
00:00:02", null]
+ ])")
+ .ValueOrDie());
+
+ ArrowArray c_array;
+ ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok());
+ ASSERT_OK(format_writer->AddBatch(&c_array));
+ ASSERT_OK(format_writer->Flush());
+ ASSERT_OK(format_writer->Finish());
+ ASSERT_OK(out->Flush());
+ ASSERT_OK(out->Close());
+}
+
+} // namespace paimon::parquet::test