This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 6f4dac32501 [feature](be) Add basic parquet list reader
6f4dac32501 is described below
commit 6f4dac325011df9b4c4df11f5087c0aa5e5db1c3
Author: Socrates <[email protected]>
AuthorDate: Thu May 28 11:02:39 2026 +0800
[feature](be) Add basic parquet list reader
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Add the next step of complex type support in the new
parquet reader by normalizing standard LIST schema to Array(element), allowing
nested leaf RecordReader usage, and reading non-empty LIST<required primitive>
columns from repetition levels.
### Release note
None
### Check List (For Author)
- Test: Manual test
- Ran clang-format dry-run and git diff --check for modified files.
- Ran BUILD_TYPE=DEBUG ./build.sh --be on Fedora successfully with the
patch applied.
- Attempted ParquetColumnReaderTest on Fedora, but stopped the ASAN_UT
build because it triggered a fresh full UT build; no test binary execution
result was produced.
- Behavior changed: Yes. The new parquet reader can now read a limited
non-empty LIST<required primitive> shape and reports NotSupported for
unsupported list shapes instead of rejecting all LIST columns.
- Does this need documentation: No
---
be/src/format/new_parquet/column_reader.cpp | 235 ++++++++++++++++++---
be/src/format/new_parquet/column_reader.h | 7 +
.../format/new_parquet/parquet_column_schema.cpp | 24 ++-
be/src/format/new_parquet/parquet_type.cpp | 4 -
.../new_parquet/parquet_column_reader_test.cpp | 39 ++++
5 files changed, 274 insertions(+), 35 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index f1674b767b0..9952016832c 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -31,7 +31,9 @@
#include <vector>
#include "core/column/column.h"
+#include "core/column/column_array.h"
#include "core/column/column_struct.h"
+#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_struct.h"
#include "core/data_type_serde/decoded_column_view.h"
@@ -66,6 +68,7 @@ public:
const std::shared_ptr<::parquet::internal::RecordReader>& record_reader()
const {
return _record_reader;
}
+ const ParquetTypeDescriptor& type_descriptor() const { return
_type_descriptor; }
private:
int _file_column_id = -1;
@@ -101,6 +104,32 @@ private:
std::vector<std::unique_ptr<ParquetColumnReader>> _children;
};
+class ListColumnReader final : public ParquetColumnReader {
+public:
+ ListColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
+ std::unique_ptr<ParquetColumnReader> element_reader)
+ : _field_id(schema.top_level_field_id),
+ _repeated_repetition_level(schema.repeated_repetition_level),
+ _type(std::move(type)),
+ _name(schema.name),
+ _element_reader(std::move(element_reader)) {}
+
+ int file_column_id() const override { return _field_id; }
+ int parquet_leaf_column_id() const override { return -1; }
+ const DataTypePtr& type() const override { return _type; }
+ const std::string& name() const override { return _name; }
+
+ Status read(int64_t rows, MutableColumnPtr& column, int64_t* rows_read)
override;
+ Status skip(int64_t rows) override;
+
+private:
+ int _field_id = -1;
+ int16_t _repeated_repetition_level = 0;
+ DataTypePtr _type;
+ std::string _name;
+ std::unique_ptr<ParquetColumnReader> _element_reader;
+};
+
Status read_records(ScalarColumnReader& column_reader, int64_t batch_rows,
::parquet::internal::RecordReader** record_reader,
int64_t* rows_read) {
auto reader = column_reader.record_reader();
@@ -275,6 +304,34 @@ Status build_binary_values(const ScalarColumnReader&
column_reader,
return Status::OK();
}
+Status append_scalar_values(const ScalarColumnReader& column_reader,
+ ::parquet::internal::RecordReader& record_reader,
int64_t row_count,
+ const NullMap* null_map, MutableColumnPtr& column)
{
+ std::vector<StringRef> binary_values;
+ std::vector<std::shared_ptr<::arrow::Array>> binary_chunks;
+ DecodedColumnView view;
+ view.value_kind = decoded_value_kind(column_reader.type_descriptor());
+ view.time_unit =
decoded_time_unit(column_reader.type_descriptor().time_unit);
+ view.row_count = row_count;
+ view.decimal_precision = column_reader.type_descriptor().decimal_precision;
+ view.decimal_scale = column_reader.type_descriptor().decimal_scale;
+ view.fixed_length = column_reader.type_descriptor().fixed_length;
+ view.null_map = null_map == nullptr || null_map->empty() ? nullptr :
null_map->data();
+ if (view.value_kind == DecodedValueKind::BINARY ||
+ view.value_kind == DecodedValueKind::FIXED_BINARY) {
+ RETURN_IF_ERROR(get_binary_chunks(column_reader, record_reader,
&binary_chunks));
+ RETURN_IF_ERROR(
+ build_binary_values(column_reader, binary_chunks, row_count,
&binary_values));
+ view.binary_values = &binary_values;
+ } else {
+ view.values = record_reader.values();
+ }
+
+ RETURN_IF_ERROR(
+
column_reader.type()->get_serde()->read_column_from_decoded_values(*column,
view));
+ return Status::OK();
+}
+
} // namespace
Status ScalarColumnReader::read(int64_t rows, MutableColumnPtr& column,
int64_t* rows_read) {
@@ -297,26 +354,7 @@ Status ScalarColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t*
NullMap null_map;
RETURN_IF_ERROR(build_null_map(*this, *record_reader, *rows_read,
&null_map));
- std::vector<StringRef> binary_values;
- std::vector<std::shared_ptr<::arrow::Array>> binary_chunks;
- DecodedColumnView view;
- view.value_kind = decoded_value_kind(_type_descriptor);
- view.time_unit = decoded_time_unit(_type_descriptor.time_unit);
- view.row_count = *rows_read;
- view.decimal_precision = _type_descriptor.decimal_precision;
- view.decimal_scale = _type_descriptor.decimal_scale;
- view.fixed_length = _type_descriptor.fixed_length;
- view.null_map = null_map.empty() ? nullptr : null_map.data();
- if (view.value_kind == DecodedValueKind::BINARY ||
- view.value_kind == DecodedValueKind::FIXED_BINARY) {
- RETURN_IF_ERROR(get_binary_chunks(*this, *record_reader,
&binary_chunks));
- RETURN_IF_ERROR(build_binary_values(*this, binary_chunks, *rows_read,
&binary_values));
- view.binary_values = &binary_values;
- } else {
- view.values = record_reader->values();
- }
-
-
RETURN_IF_ERROR(_type->get_serde()->read_column_from_decoded_values(*column,
view));
+ RETURN_IF_ERROR(append_scalar_values(*this, *record_reader, *rows_read,
&null_map, column));
return Status::OK();
}
@@ -395,6 +433,92 @@ Status StructColumnReader::skip(int64_t rows) {
return Status::OK();
}
+Status ListColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
rows_read) {
+ if (column.get() == nullptr || rows_read == nullptr) {
+ return Status::InvalidArgument("Invalid parquet list read result
pointer for column {}",
+ _name);
+ }
+ if (_element_reader == nullptr) {
+ return Status::InternalError("Parquet list element reader is not
initialized for column {}",
+ _name);
+ }
+ auto* element_reader =
dynamic_cast<ScalarColumnReader*>(_element_reader.get());
+ if (element_reader == nullptr) {
+ return Status::NotSupported(
+ "Current parquet LIST reader only supports scalar elements for
column {}", _name);
+ }
+ if (element_reader->descriptor()->max_definition_level() != 1) {
+ return Status::NotSupported(
+ "Current parquet LIST reader only supports required elements
for column {}", _name);
+ }
+
+ ::parquet::internal::RecordReader* record_reader = nullptr;
+ int64_t records_read = 0;
+ RETURN_IF_ERROR(read_records(*element_reader, rows, &record_reader,
&records_read));
+ const int64_t levels_written = record_reader->levels_written();
+ if (records_read != rows || levels_written < records_read) {
+ return Status::Corruption(
+ "Invalid parquet LIST read result for column {}: rows={},
levels={}", _name,
+ records_read, levels_written);
+ }
+ if (record_reader->values_written() != levels_written) {
+ return Status::NotSupported(
+ "Current parquet LIST reader only supports non-empty lists
with required "
+ "elements for column {}",
+ _name);
+ }
+ const int16_t max_definition_level =
element_reader->descriptor()->max_definition_level();
+ if (auto* def_levels = record_reader->def_levels(); def_levels != nullptr)
{
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (def_levels[level_idx] != max_definition_level) {
+ return Status::NotSupported(
+ "Current parquet LIST reader only supports non-empty
lists with required "
+ "elements for column {}",
+ _name);
+ }
+ }
+ }
+
+ auto& array_column = assert_cast<ColumnArray&>(*column);
+ auto nested_column = array_column.get_data_ptr()->assume_mutable();
+ RETURN_IF_ERROR(append_scalar_values(*element_reader, *record_reader,
levels_written, nullptr,
+ nested_column));
+ array_column.get_data_ptr() = std::move(nested_column);
+
+ auto* rep_levels = record_reader->rep_levels();
+ if (rep_levels == nullptr && levels_written > 0) {
+ return Status::Corruption(
+ "Parquet LIST reader returned null repetition levels for
column {}", _name);
+ }
+ auto& offsets = array_column.get_offsets();
+ offsets.reserve(offsets.size() + static_cast<size_t>(records_read));
+ size_t current_offset = offsets.empty() ? 0 : offsets.back();
+ int64_t current_record = 0;
+ for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
+ if (level_idx == 0 || rep_levels[level_idx] <
_repeated_repetition_level) {
+ if (level_idx != 0) {
+ offsets.push_back(current_offset);
+ current_record++;
+ }
+ }
+ current_offset++;
+ }
+ while (current_record < records_read) {
+ offsets.push_back(current_offset);
+ current_record++;
+ }
+ *rows_read = records_read;
+ return Status::OK();
+}
+
+Status ListColumnReader::skip(int64_t rows) {
+ if (rows <= 0) {
+ return Status::OK();
+ }
+ DORIS_CHECK(_element_reader != nullptr);
+ return _element_reader->skip(rows);
+}
+
Status ParquetColumnReader::skip(int64_t rows) {
return Status::NotSupported("Parquet column skip is not implemented,
rows={}", rows);
}
@@ -469,6 +593,42 @@ Status
ParquetColumnReaderFactory::create_scalar_column_reader(
"column {} is not supported",
column_schema.name);
}
+ if (column_schema.descriptor == nullptr ||
+ column_schema.descriptor->max_repetition_level() != 0 ||
+ column_schema.descriptor->max_definition_level() > 1) {
+ return Status::NotSupported(
+ "Current parquet scalar reader only supports flat primitive
columns; column {} is "
+ "not supported",
+ column_schema.name);
+ }
+ std::shared_ptr<::parquet::internal::RecordReader> record_reader;
+ RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id,
column_schema.descriptor,
+ column_schema.name, &record_reader));
+ return create_scalar_reader(column_schema.leaf_column_id,
column_schema.type_descriptor,
+ column_schema.descriptor, column_schema.type,
column_schema.name,
+ std::move(record_reader), reader);
+}
+
+Status ParquetColumnReaderFactory::create_nested_scalar_column_reader(
+ const ParquetColumnSchema& column_schema,
+ std::unique_ptr<ParquetColumnReader>* reader) const {
+ if (reader == nullptr) {
+ return Status::InvalidArgument("reader is null");
+ }
+ if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE) {
+ return Status::InvalidArgument("Parquet nested scalar reader requires
primitive column {}",
+ column_schema.name);
+ }
+ if (column_schema.leaf_column_id < 0 ||
+ column_schema.leaf_column_id >=
static_cast<int>(_record_readers.size())) {
+ return Status::InvalidArgument("Invalid parquet leaf column id {} for
column {}",
+ column_schema.leaf_column_id,
column_schema.name);
+ }
+ if (!supports_record_reader(column_schema.type_descriptor)) {
+ return Status::NotSupported(
+ "Current parquet nested scalar reader does not support column
{}",
+ column_schema.name);
+ }
std::shared_ptr<::parquet::internal::RecordReader> record_reader;
RETURN_IF_ERROR(get_record_reader(column_schema.leaf_column_id,
column_schema.descriptor,
column_schema.name, &record_reader));
@@ -494,12 +654,6 @@ Status ParquetColumnReaderFactory::get_record_reader(
if (descriptor == nullptr) {
return Status::InvalidArgument("Parquet column descriptor is null for
column {}", name);
}
- if (descriptor->max_repetition_level() != 0 ||
descriptor->max_definition_level() > 1) {
- return Status::NotSupported(
- "Current parquet reader only supports RecordReader-backed
columns; column {} is "
- "not supported",
- name);
- }
if (_record_readers[leaf_column_id] == nullptr) {
try {
_record_readers[leaf_column_id] =
@@ -569,6 +723,32 @@ Status
ParquetColumnReaderFactory::create_struct_column_reader(
return Status::OK();
}
+Status ParquetColumnReaderFactory::create_list_column_reader(
+ const ParquetColumnSchema& column_schema, const
reader::FieldProjection* projection,
+ std::unique_ptr<ParquetColumnReader>* reader) const {
+ if (reader == nullptr) {
+ return Status::InvalidArgument("reader is null");
+ }
+ if (projection != nullptr && !projection->project_all_children) {
+ return Status::NotSupported("Parquet LIST projection is not
implemented for column {}",
+ column_schema.name);
+ }
+ if (column_schema.type != nullptr && column_schema.type->is_nullable()) {
+ return Status::NotSupported("Nullable parquet LIST reader is not
implemented for column {}",
+ column_schema.name);
+ }
+ if (column_schema.children.size() != 1) {
+ return Status::NotSupported("Unsupported parquet LIST layout for
column {}",
+ column_schema.name);
+ }
+ std::unique_ptr<ParquetColumnReader> element_reader;
+ RETURN_IF_ERROR(
+ create_nested_scalar_column_reader(*column_schema.children[0],
&element_reader));
+ *reader = std::make_unique<ListColumnReader>(column_schema,
column_schema.type,
+ std::move(element_reader));
+ return Status::OK();
+}
+
Status ParquetColumnReaderFactory::create(const ParquetColumnSchema&
column_schema,
const reader::FieldProjection*
projection,
std::unique_ptr<ParquetColumnReader>* reader) const {
@@ -581,8 +761,7 @@ Status ParquetColumnReaderFactory::create(const
ParquetColumnSchema& column_sche
case ParquetColumnSchemaKind::STRUCT:
return create_struct_column_reader(column_schema, projection, reader);
case ParquetColumnSchemaKind::LIST:
- return Status::NotSupported("Parquet LIST reader is not implemented
for column {}",
- column_schema.name);
+ return create_list_column_reader(column_schema, projection, reader);
case ParquetColumnSchemaKind::MAP:
return Status::NotSupported("Parquet MAP reader is not implemented for
column {}",
column_schema.name);
diff --git a/be/src/format/new_parquet/column_reader.h
b/be/src/format/new_parquet/column_reader.h
index 93881ac8c48..ec691a9743e 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -104,10 +104,17 @@ private:
Status create_scalar_column_reader(const ParquetColumnSchema&
column_schema,
std::unique_ptr<ParquetColumnReader>*
reader) const;
+ Status create_nested_scalar_column_reader(const ParquetColumnSchema&
column_schema,
+
std::unique_ptr<ParquetColumnReader>* reader) const;
+
Status create_struct_column_reader(const ParquetColumnSchema&
column_schema,
const reader::FieldProjection*
projection,
std::unique_ptr<ParquetColumnReader>*
reader) const;
+ Status create_list_column_reader(const ParquetColumnSchema& column_schema,
+ const reader::FieldProjection* projection,
+ std::unique_ptr<ParquetColumnReader>*
reader) const;
+
Status get_record_reader(int leaf_column_id, const
::parquet::ColumnDescriptor* descriptor,
const std::string& name,
std::shared_ptr<::parquet::internal::RecordReader>* reader) const;
diff --git a/be/src/format/new_parquet/parquet_column_schema.cpp
b/be/src/format/new_parquet/parquet_column_schema.cpp
index 3235ea38a06..8541769c1d2 100644
--- a/be/src/format/new_parquet/parquet_column_schema.cpp
+++ b/be/src/format/new_parquet/parquet_column_schema.cpp
@@ -140,6 +140,9 @@ Status build_node_schema(const ::parquet::SchemaDescriptor&
schema,
return Status::NotSupported("Unsupported parquet column type for
column {}",
node.name());
}
+ column_schema->type = node.is_optional()
+ ?
make_nullable(remove_nullable(column_schema->type))
+ : remove_nullable(column_schema->type);
*result = std::move(column_schema);
return Status::OK();
}
@@ -151,10 +154,25 @@ Status build_node_schema(const
::parquet::SchemaDescriptor& schema,
return Status::NotSupported("Unsupported parquet LIST encoding for
column {}",
node.name());
}
+ const auto& repeated_node = *group.field(0);
+ if (!repeated_node.is_repeated() || repeated_node.is_primitive()) {
+ return Status::NotSupported("Unsupported parquet LIST encoding for
column {}",
+ node.name());
+ }
+ const auto& repeated_group =
+ static_cast<const
::parquet::schema::GroupNode&>(repeated_node);
+ if (repeated_group.field_count() != 1) {
+ return Status::NotSupported("Unsupported parquet LIST element
layout for column {}",
+ node.name());
+ }
+ auto repeated_context =
+ child_context(context, repeated_node, 0,
column_schema->schema_node_id);
+ column_schema->repeated_repetition_level =
repeated_context.repeated_repetition_level;
std::unique_ptr<ParquetColumnSchema> child;
- RETURN_IF_ERROR(build_node_schema(
- schema, *group.field(0),
- child_context(context, *group.field(0), 0,
column_schema->schema_node_id), &child));
+ RETURN_IF_ERROR(build_node_schema(schema, *repeated_group.field(0),
+ child_context(repeated_context,
*repeated_group.field(0),
+ 0,
column_schema->schema_node_id),
+ &child));
column_schema->type =
nullable_if_needed(std::make_shared<DataTypeArray>(child->type), node);
column_schema->children.push_back(std::move(child));
diff --git a/be/src/format/new_parquet/parquet_type.cpp
b/be/src/format/new_parquet/parquet_type.cpp
index 53c7b4f2ed9..4079c989f7d 100644
--- a/be/src/format/new_parquet/parquet_type.cpp
+++ b/be/src/format/new_parquet/parquet_type.cpp
@@ -323,10 +323,6 @@ ParquetTypeDescriptor resolve_parquet_type(const
::parquet::ColumnDescriptor* co
!result.is_decimal && (result.physical_type ==
::parquet::Type::BYTE_ARRAY ||
result.physical_type ==
::parquet::Type::FIXED_LEN_BYTE_ARRAY);
- if (column->max_repetition_level() != 0 || column->max_definition_level()
> 1) {
- result.supports_record_reader = false;
- return result;
- }
if (!record_reader_physical_type_supported(result.physical_type)) {
result.supports_record_reader = false;
return result;
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index 97773a5bada..b85bbb80a6a 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -29,12 +29,14 @@
#include <vector>
#include "core/assert_cast.h"
+#include "core/column/column_array.h"
#include "core/column/column_decimal.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type.h"
+#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_struct.h"
#include "core/types.h"
@@ -144,6 +146,21 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_required_int_list_array() {
+ auto value_builder = std::make_shared<arrow::Int32Builder>();
+ arrow::ListBuilder builder(arrow::default_memory_pool(),
value_builder);
+ const std::vector<std::vector<int32_t>> values = {
+ {1, 2}, {3}, {4, 5, 6}, {7}, {8, 9},
+ };
+ for (const auto& row : values) {
+ EXPECT_TRUE(builder.Append().ok());
+ for (const auto value : row) {
+ EXPECT_TRUE(value_builder->Append(value).ok());
+ }
+ }
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_time32_array(const
std::shared_ptr<arrow::DataType>& type,
const
std::vector<int32_t>& values) {
arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -365,6 +382,28 @@ protected:
EXPECT_EQ(b_values.get_data_at(1).to_string(), "sb");
EXPECT_EQ(b_values.get_data_at(4).to_string(), "se");
});
+ add_field(arrow::field("list_int_col",
+ arrow::list(arrow::field("element",
arrow::int32(), false)), false),
+ build_required_int_list_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_ARRAY);
+ const auto* array_type =
+ assert_cast<const
DataTypeArray*>(remove_nullable(schema.type).get());
+ EXPECT_EQ(
+
remove_nullable(array_type->get_nested_type())->get_primitive_type(),
+ TYPE_INT);
+ const auto& array_column = assert_cast<const
ColumnArray&>(column);
+ ASSERT_EQ(array_column.size(), ROW_COUNT);
+ EXPECT_EQ(array_column.size_at(0), 2);
+ EXPECT_EQ(array_column.size_at(1), 1);
+ EXPECT_EQ(array_column.size_at(2), 3);
+ EXPECT_EQ(array_column.size_at(4), 2);
+ const auto& values = assert_cast<const
ColumnInt32&>(array_column.get_data());
+ ASSERT_EQ(values.size(), 9);
+ EXPECT_EQ(values.get_element(0), 1);
+ EXPECT_EQ(values.get_element(5), 6);
+ EXPECT_EQ(values.get_element(8), 9);
+ });
auto schema = arrow::schema(_arrow_fields);
auto table = arrow::Table::Make(schema, _arrays);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]