This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch nested_column_prune in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4000802e5fb7ae4f89f77548f4557c9db4f1302f Author: kakachen <[email protected]> AuthorDate: Wed Oct 29 22:08:27 2025 +0800 fix topN late materialzation access_path. --- be/src/runtime/descriptors.cpp | 53 ++++++++++++++++++++++- be/src/runtime/descriptors.h | 4 +- be/src/vec/exec/format/table/hive_reader.cpp | 56 +++++++------------------ be/src/vec/exec/format/table/iceberg_reader.cpp | 28 ++++--------- gensrc/proto/descriptors.proto | 45 ++++++++++++++++++++ 5 files changed, 123 insertions(+), 63 deletions(-) diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index a17f0926752..bc27c6a41a0 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -103,7 +103,31 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc) _is_materialized(pdesc.is_materialized()), _is_key(pdesc.is_key()), _column_paths(pdesc.column_paths().begin(), pdesc.column_paths().end()), - _is_auto_increment(pdesc.is_auto_increment()) {} + _is_auto_increment(pdesc.is_auto_increment()) { + auto convert_to_thrift_column_access_path = [](const PColumnAccessPath& pb_path) { + TColumnAccessPath thrift_path; + thrift_path.type = (TAccessPathType::type)pb_path.type(); + if (pb_path.has_data_access_path()) { + thrift_path.__isset.data_access_path = true; + for (int i = 0; i < pb_path.data_access_path().path_size(); ++i) { + thrift_path.data_access_path.path.push_back(pb_path.data_access_path().path(i)); + } + } + if (pb_path.has_meta_access_path()) { + thrift_path.__isset.meta_access_path = true; + for (int i = 0; i < pb_path.meta_access_path().path_size(); ++i) { + thrift_path.meta_access_path.path.push_back(pb_path.meta_access_path().path(i)); + } + } + return thrift_path; + }; + for (const auto& pb_path : pdesc.all_access_paths()) { + _all_access_paths.push_back(convert_to_thrift_column_access_path(pb_path)); + } + for (const auto& pb_path : pdesc.predicate_access_paths()) { + _predicate_access_paths.push_back(convert_to_thrift_column_access_path(pb_path)); + } +} #ifdef BE_TEST SlotDescriptor::SlotDescriptor() @@ -137,6 +161,33 @@ void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const { for (const std::string& path : _column_paths) { pslot->add_column_paths(path); } + auto convert_to_protobuf_column_access_path = [](const TColumnAccessPath& thrift_path, + doris::PColumnAccessPath* pb_path) { + pb_path->Clear(); + pb_path->set_type((PAccessPathType)thrift_path.type); // 使用 reinterpret_cast 进行类型转换 + if (thrift_path.__isset.data_access_path) { + auto* pb_data = pb_path->mutable_data_access_path(); + pb_data->Clear(); + for (const auto& s : thrift_path.data_access_path.path) { + pb_data->add_path(s); + } + } + if (thrift_path.__isset.meta_access_path) { + auto* pb_meta = pb_path->mutable_meta_access_path(); + pb_meta->Clear(); + for (const auto& s : thrift_path.meta_access_path.path) { + pb_meta->add_path(s); + } + } + }; + for (const auto& path : _all_access_paths) { + auto* pb_path = pslot->add_all_access_paths(); + convert_to_protobuf_column_access_path(path, pb_path); + } + for (const auto& path : _predicate_access_paths) { + auto* pb_path = pslot->add_predicate_access_paths(); + convert_to_protobuf_column_access_path(path, pb_path); + } } vectorized::DataTypePtr SlotDescriptor::get_data_type_ptr() const { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index b34e54622b5..1446ce4c3d9 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -137,8 +137,8 @@ private: const bool _is_key; const std::vector<std::string> _column_paths; - const TColumnAccessPaths _all_access_paths; - const TColumnAccessPaths _predicate_access_paths; + TColumnAccessPaths _all_access_paths; + TColumnAccessPaths _predicate_access_paths; const bool _is_auto_increment; const std::string _col_default_value; diff --git a/be/src/vec/exec/format/table/hive_reader.cpp b/be/src/vec/exec/format/table/hive_reader.cpp index eea7a126517..81c8cf47f42 100644 --- a/be/src/vec/exec/format/table/hive_reader.cpp +++ b/be/src/vec/exec/format/table/hive_reader.cpp @@ -120,8 +120,7 @@ ColumnIdResult HiveOrcReader::_create_column_ids(const orc::Type* orc_type, auto process_access_paths = [](const orc::Type* orc_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!orc_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -134,7 +133,6 @@ ColumnIdResult HiveOrcReader::_create_column_ids(const orc::Type* orc_type, } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -148,7 +146,7 @@ ColumnIdResult HiveOrcReader::_create_column_ids(const orc::Type* orc_type, paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = orc_field->getColumnId(); uint64_t max_column_id = orc_field->getMaximumColumnId(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -183,15 +181,11 @@ ColumnIdResult HiveOrcReader::_create_column_ids(const orc::Type* orc_type, // complex types: // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(orc_field, all_access_paths, column_ids); - } + process_access_paths(orc_field, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(orc_field, predicate_access_paths, filter_column_ids); - } + process_access_paths(orc_field, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); @@ -221,8 +215,7 @@ ColumnIdResult HiveOrcReader::_create_column_ids_by_top_level_col_index( auto process_access_paths = [](const orc::Type* orc_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!orc_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -235,7 +228,6 @@ ColumnIdResult HiveOrcReader::_create_column_ids_by_top_level_col_index( } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -249,7 +241,7 @@ ColumnIdResult HiveOrcReader::_create_column_ids_by_top_level_col_index( paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = orc_field->getColumnId(); uint64_t max_column_id = orc_field->getMaximumColumnId(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -284,15 +276,11 @@ ColumnIdResult HiveOrcReader::_create_column_ids_by_top_level_col_index( // complex types // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(orc_field, all_access_paths, column_ids); - } + process_access_paths(orc_field, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(orc_field, predicate_access_paths, filter_column_ids); - } + process_access_paths(orc_field, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); @@ -403,8 +391,7 @@ ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* fiel auto process_access_paths = [](const FieldSchema* parquet_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!parquet_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -417,7 +404,6 @@ ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* fiel } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -431,7 +417,7 @@ ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* fiel paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = parquet_field->get_column_id(); uint64_t max_column_id = parquet_field->get_max_column_id(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -468,15 +454,11 @@ ColumnIdResult HiveParquetReader::_create_column_ids(const FieldDescriptor* fiel // complex types: // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(field_schema, all_access_paths, column_ids); - } + process_access_paths(field_schema, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(field_schema, predicate_access_paths, filter_column_ids); - } + process_access_paths(field_schema, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); @@ -509,8 +491,7 @@ ColumnIdResult HiveParquetReader::_create_column_ids_by_top_level_col_index( auto process_access_paths = [](const FieldSchema* parquet_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!parquet_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -523,7 +504,6 @@ ColumnIdResult HiveParquetReader::_create_column_ids_by_top_level_col_index( } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -537,7 +517,7 @@ ColumnIdResult HiveParquetReader::_create_column_ids_by_top_level_col_index( paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = parquet_field->get_column_id(); uint64_t max_column_id = parquet_field->get_max_column_id(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -572,15 +552,11 @@ ColumnIdResult HiveParquetReader::_create_column_ids_by_top_level_col_index( } // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(field_schema, all_access_paths, column_ids); - } + process_access_paths(field_schema, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(field_schema, predicate_access_paths, filter_column_ids); - } + process_access_paths(field_schema, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 9a9e984a9d9..b08229165ea 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -497,8 +497,7 @@ ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* f auto process_access_paths = [](const FieldSchema* parquet_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!parquet_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -511,7 +510,6 @@ ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* f } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -525,7 +523,7 @@ ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* f paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = parquet_field->get_column_id(); uint64_t max_column_id = parquet_field->get_max_column_id(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -562,15 +560,11 @@ ColumnIdResult IcebergParquetReader::_create_column_ids(const FieldDescriptor* f // complex types: // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(field_schema, all_access_paths, column_ids); - } + process_access_paths(field_schema, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(field_schema, predicate_access_paths, filter_column_ids); - } + process_access_paths(field_schema, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); } @@ -697,8 +691,7 @@ ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, auto process_access_paths = [](const orc::Type* orc_field, const std::vector<TColumnAccessPath>& access_paths, std::set<uint64_t>& out_ids) { - if (!orc_field) return; - if (access_paths.empty()) return; + bool access_paths_empty = access_paths.empty(); std::vector<std::vector<std::string>> paths; bool has_top_level_only = false; @@ -711,7 +704,6 @@ ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, } else { continue; } - DCHECK(path.size() >= 1); std::vector<std::string> remaining_path; if (path.size() > 1) { remaining_path.assign(path.begin() + 1, path.end()); @@ -725,7 +717,7 @@ ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, paths.push_back(std::move(remaining_path)); } - if (has_top_level_only) { + if (has_top_level_only || access_paths_empty) { uint64_t start_id = orc_field->getColumnId(); uint64_t max_column_id = orc_field->getMaximumColumnId(); for (uint64_t id = start_id; id <= max_column_id; ++id) { @@ -760,15 +752,11 @@ ColumnIdResult IcebergOrcReader::_create_column_ids(const orc::Type* orc_type, // nested types: // collect and process all_access_paths -> column_ids - if (!all_access_paths.empty()) { - process_access_paths(orc_field, all_access_paths, column_ids); - } + process_access_paths(orc_field, all_access_paths, column_ids); // collect and process predicate_access_paths -> filter_column_ids const auto& predicate_access_paths = slot->predicate_access_paths(); - if (!predicate_access_paths.empty()) { - process_access_paths(orc_field, predicate_access_paths, filter_column_ids); - } + process_access_paths(orc_field, predicate_access_paths, filter_column_ids); } return ColumnIdResult(std::move(column_ids), std::move(filter_column_ids)); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 31008c90725..f764e90c771 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -23,6 +23,49 @@ option java_package = "org.apache.doris.proto"; import "types.proto"; import "olap_file.proto"; +message PDataAccessPath { + // the specification of special path: + // <empty>: access the whole complex column + // *: + // 1. access every items when the type is array + // 2. access key and value when the type is map + // KEYS: only access the keys of map + // VALUES: only access the keys of map + // + // example: + // s: struct< + // data: array< + // map< + // int, + // struct< + // a: id + // b: double + // > + // > + // > + // > + // if we want to access `map_keys(s.data[0])`, the path will be: ['s', 'data', '*', 'KEYS'], + // if we want to access `map_values(s.data[0])[0].b`, the path will be: ['s', 'data', '*', 'VALUES', 'b'], + // if we want to access `s.data[0]['k'].b`, the path will be ['s', 'data', '*', '*', 'b'] + // if we want to access the whole struct of s, the path will be: ['s'], + repeated string path = 1; +} + +message PMetaAccessPath { + repeated string path = 1; +} + +enum PAccessPathType { + DATA = 1; + META = 2; // use to prune `where s.data is not null` by only scan the meta of s.data +} + +message PColumnAccessPath { + required PAccessPathType type = 1; + optional PDataAccessPath data_access_path = 2; + optional PMetaAccessPath meta_access_path = 3; +} + message PSlotDescriptor { required int32 id = 1; required int32 parent = 2; // tuple id which this slot is belong to @@ -39,6 +82,8 @@ message PSlotDescriptor { optional bool is_auto_increment = 13; optional int32 col_type = 14 [default = 0]; repeated string column_paths = 15; + repeated PColumnAccessPath all_access_paths = 16; + repeated PColumnAccessPath predicate_access_paths = 17; }; message PTupleDescriptor { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
