This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 4fe7254976e Refactor 0527 (#63712)
4fe7254976e is described below
commit 4fe7254976e50e96edd2514bbe371fd8cfbc8b6d
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 10:33:29 2026 +0800
Refactor 0527 (#63712)
---
be/test/format/reader/table_reader_test.cpp | 303 ++++++++++++++++++++++++++++
1 file changed, 303 insertions(+)
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
new file mode 100644
index 00000000000..84c5700fc4c
--- /dev/null
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/table_reader.h"
+
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <gtest/gtest.h>
+#include <parquet/arrow/writer.h>
+
+#include <filesystem>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/runtime_state.h"
+
+namespace doris::reader {
+namespace {
+
+std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
+ std::shared_ptr<arrow::Array> array;
+ EXPECT_TRUE(builder->Finish(&array).ok());
+ return array;
+}
+
+std::shared_ptr<arrow::Array> build_int32_array(const std::vector<int32_t>&
values) {
+ arrow::Int32Builder builder;
+ for (const auto value : values) {
+ EXPECT_TRUE(builder.Append(value).ok());
+ }
+ return finish_array(&builder);
+}
+
+std::shared_ptr<arrow::Array> build_string_array(const
std::vector<std::string>& values) {
+ arrow::StringBuilder builder;
+ for (const auto& value : values) {
+ EXPECT_TRUE(builder.Append(value).ok());
+ }
+ return finish_array(&builder);
+}
+
+void write_parquet_file(const std::string& file_path, int32_t id, const
std::string& value) {
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false),
+ arrow::field("value", arrow::utf8(), false),
+ });
+ auto table =
+ arrow::Table::Make(schema, {build_int32_array({id}),
build_string_array({value})});
+
+ auto file_result = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(file_result.ok()) << file_result.status();
+ std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+ ::parquet::WriterProperties::Builder builder;
+ builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+ builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+ builder.compression(::parquet::Compression::UNCOMPRESSED);
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+ *table, arrow::default_memory_pool(), out, 1, builder.build()));
+}
+
+Block build_table_block(const std::vector<TableColumn>& columns) {
+ Block block;
+ for (const auto& column : columns) {
+ block.insert({column.type->create_column(), column.type, column.name});
+ }
+ return block;
+}
+
+SplitReadOptions build_split_options(const std::string& file_path) {
+ SplitReadOptions options;
+ options.current_range.__set_path(file_path);
+ options.current_range.__set_file_size(
+ static_cast<int64_t>(std::filesystem::file_size(file_path)));
+ return options;
+}
+
+TEST(TableReaderTest, ReopenSplitAfterClose) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
"doris_table_reader_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const std::vector<std::string> file_paths = {
+ (test_dir / "split_1.parquet").string(),
+ (test_dir / "split_2.parquet").string(),
+ (test_dir / "split_3.parquet").string(),
+ };
+ write_parquet_file(file_paths[0], 1, "one");
+ write_parquet_file(file_paths[1], 2, "two");
+ write_parquet_file(file_paths[2], 3, "three");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back({.id = 0, .name = "id", .type =
std::make_shared<DataTypeInt32>()});
+ projected_columns.push_back(
+ {.id = 1, .name = "value", .type =
std::make_shared<DataTypeString>()});
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ })
+ .ok());
+
+ // Simulate the scanner lifecycle for three different splits:
+ // init() once, then repeat prepare_split() -> get_block() -> close().
+ // This verifies TableReader::close() fully releases the previous
low-level reader and task
+ // state, so a later prepare_split() can open and read a new split on the
same TableReader.
+ std::vector<int32_t> ids;
+ std::vector<std::string> values;
+ for (const auto& file_path : file_paths) {
+ auto split_options = build_split_options(file_path);
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& value_column =
+ assert_cast<const
ColumnString&>(*block.get_by_position(1).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ASSERT_EQ(value_column.size(), 1);
+ ids.push_back(id_column.get_element(0));
+ values.push_back(value_column.get_data_at(0).to_string());
+
+ ASSERT_TRUE(reader.close().ok());
+ }
+
+ EXPECT_EQ(ids, std::vector<int32_t>({1, 2, 3}));
+ EXPECT_EQ(values, std::vector<std::string>({"one", "two", "three"}));
+
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_schema_mismatch_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 1, "one");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ {.id = 99, .name = "missing_value", .type =
std::make_shared<DataTypeString>()});
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // The table projection asks for field id 99, but the ParquetReader
exposes only file-local
+ // fields 0 and 1. get_block() opens the split lazily, so this is where
TableReader must reject
+ // the mismatch between TableReadOptions::projected_columns and the
Parquet file schema.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ const auto status = reader.get_block(&block, &eos);
+ ASSERT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("does not have a matching file column"),
std::string::npos);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMismatch) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_same_name_diff_id_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 1, "one");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ {.id = 99, .name = "id", .type =
std::make_shared<DataTypeInt32>()});
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // The table column has the same name as the Parquet field, but a
different field id.
+ // TableReader configures ColumnMapper in BY_FIELD_ID mode, so the name
match must not hide
+ // the id mismatch.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ const auto status = reader.get_block(&block, &eos);
+ ASSERT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("does not have a matching file column"),
std::string::npos);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismatch) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_mapper_expr_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 7, "seven");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ {.id = 0, .name = "table_id", .type =
std::make_shared<DataTypeInt64>()});
+ projected_columns.push_back(
+ {.id = 1, .name = "table_value", .type =
std::make_shared<DataTypeString>()});
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // The table projection is intentionally different from the Parquet schema:
+ // field id 0 is requested as BIGINT instead of the file INT, so
ColumnMapper should build a
+ // Cast expression; field id 1 has a different table name but the same
type, so it should build
+ // a SlotRef projection. Both columns should still materialize in table
schema order.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ ASSERT_EQ(block.get_by_position(0).name, "table_id");
+ ASSERT_EQ(block.get_by_position(1).name, "table_value");
+ const auto& id_column = assert_cast<const
ColumnInt64&>(*block.get_by_position(0).column);
+ const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(1).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ASSERT_EQ(value_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 7);
+ EXPECT_EQ(value_column.get_data_at(0).to_string(), "seven");
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+} // namespace
+} // namespace doris::reader
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]