This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c6640c02ff [External](refactor) Do some refactor in orc reader to
remove unless code (#56717)
2c6640c02ff is described below
commit 2c6640c02ff6836eda0d3e0b0932e5d96f8176ee
Author: HappenLee <[email protected]>
AuthorDate: Fri Oct 10 14:07:23 2025 +0800
[External](refactor) Do some refactor in orc reader to remove unless code
(#56717)
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 204 +++++++++++------------------
be/src/vec/exec/format/orc/vorc_reader.h | 50 ++++---
2 files changed, 101 insertions(+), 153 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 7da93423dca..ce650534cd1 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -215,12 +215,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_init_file_description();
}
-OrcReader::~OrcReader() {
- if (_obj_pool && _obj_pool.get()) {
- _obj_pool->clear();
- }
-}
-
void OrcReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
@@ -376,7 +370,7 @@ Status OrcReader::init_reader(
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
- _obj_pool = std::make_shared<ObjectPool>();
+ _obj_pool = std::make_unique<ObjectPool>();
if (_state != nullptr) {
_orc_tiny_stripe_threshold_bytes =
_state->query_options().orc_tiny_stripe_threshold_bytes;
@@ -616,29 +610,41 @@ std::tuple<bool, orc::Literal>
convert_to_orc_literal(const orc::Type* type,
}
}
-std::tuple<bool, orc::Literal, orc::PredicateDataType>
OrcReader::_make_orc_literal(
- const VSlotRef* slot_ref, const VLiteral* literal) {
+std::pair<bool, orc::PredicateDataType> OrcReader::_get_orc_predicate_type(
+ const VSlotRef* slot_ref) {
DCHECK(_table_info_node_ptr->children_column_exists(slot_ref->expr_name()));
auto file_col_name =
_table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
if (!_type_map.contains(file_col_name)) {
LOG(WARNING) << "Column " << slot_ref->expr_name() << "in file name"
<< file_col_name
<< " not found in _type_map";
- return std::make_tuple(false, orc::Literal(false),
orc::PredicateDataType::LONG);
+ return {false, orc::PredicateDataType::LONG};
}
DCHECK(_type_map.contains(file_col_name));
const auto* orc_type = _type_map[file_col_name];
if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" <<
orc_type->getKind() << "]";
- return std::make_tuple(false, orc::Literal(false),
orc::PredicateDataType::LONG);
+ return {false, orc::PredicateDataType::LONG};
}
const auto predicate_type =
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
- if (literal == nullptr) {
- // only get the predicate_type
- return std::make_tuple(true, orc::Literal(true), predicate_type);
+ return {true, predicate_type};
+}
+
+std::pair<bool, orc::Literal> OrcReader::_make_orc_literal(const VSlotRef*
slot_ref,
+ const VLiteral*
literal) {
+ // Get predicate type using new function
+ auto [valid_pred_type, predicate_type] = _get_orc_predicate_type(slot_ref);
+ if (!valid_pred_type) {
+ return {false, orc::Literal(false)};
}
+
+ // Get the orc_type again here as it's needed for convert_to_orc_literal
+ auto file_col_name =
_table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
+ const auto* orc_type = _type_map[file_col_name];
+
+ DCHECK(literal != nullptr);
// this only happens when the literals of in_predicate contains null
value, like in (1, null)
if (literal->get_column_ptr()->is_null_at(0)) {
- return std::make_tuple(false, orc::Literal(false), predicate_type);
+ return {false, orc::Literal(false)};
}
auto literal_data = literal->get_column_ptr()->get_data_at(0);
auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
@@ -649,14 +655,14 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType>
OrcReader::_make_orc_lite
if (src_type != primitive_type && !is_string_type(src_type) &&
is_string_type(primitive_type)) {
LOG(WARNING) << "Unsupported Push Down Schema Changed Column " <<
primitive_type << " to "
<< src_type;
- return std::make_tuple(false, orc::Literal(false),
orc::PredicateDataType::LONG);
+ return {false, orc::Literal(false)};
}
switch (primitive_type) {
#define M(NAME)
\
case TYPE_##NAME: {
\
auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(
\
orc_type, literal_data, slot_type->get_precision(),
slot_type->get_scale()); \
- return std::make_tuple(valid, orc_literal, predicate_type);
\
+ return {valid, orc_literal};
\
}
#define APPLY_FOR_PRIMITIVE_TYPE(M) \
M(TINYINT) \
@@ -683,7 +689,7 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType>
OrcReader::_make_orc_lite
#undef M
default: {
VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" <<
slot->col_name() << "]";
- return std::make_tuple(false, orc::Literal(false), predicate_type);
+ return {false, orc::Literal(false)};
}
}
}
@@ -700,7 +706,8 @@ bool OrcReader::_check_slot_can_push_down(const VExprSPtr&
expr) {
return false;
}
- auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
+ // Directly use _get_orc_predicate_type since we only need the type
+ auto [valid, predicate_type] = _get_orc_predicate_type(slot_ref);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
@@ -715,7 +722,7 @@ bool OrcReader::_check_literal_can_push_down(const
VExprSPtr& expr, size_t child
// the slot has been checked in _check_slot_can_push_down before calling
this function
const auto* slot_ref = static_cast<const
VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const
VLiteral*>(expr->children()[child_id].get());
- auto [valid, orc_literal, _] = _make_orc_literal(slot_ref, literal);
+ auto [valid, orc_literal] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
@@ -1016,20 +1023,19 @@ Status OrcReader::set_fill_columns(
if (expr->is_rf_wrapper()) {
// REF: src/runtime_filter/runtime_filter_consumer.cpp
- VRuntimeFilterWrapper* runtime_filter =
assert_cast<VRuntimeFilterWrapper*>(expr.get());
+ auto* runtime_filter =
static_cast<VRuntimeFilterWrapper*>(expr.get());
auto filter_impl = runtime_filter->get_impl();
visit_slot(filter_impl.get());
- // only support push down for filter row group : MAX_FILTER,
MAX_FILTER, MINMAX_FILTER, IN_FILTER
+ // only support push down for filter row group : MAX_FILTER,
MAX_FILTER, MINMAX_FILTER, IN_FILTER
if ((runtime_filter->node_type() == TExprNodeType::BINARY_PRED) &&
(runtime_filter->op() == TExprOpcode::GE ||
runtime_filter->op() == TExprOpcode::LE)) {
expr = filter_impl;
} else if (runtime_filter->node_type() == TExprNodeType::IN_PRED &&
runtime_filter->op() == TExprOpcode::FILTER_IN) {
- VDirectInPredicate* direct_in_predicate =
- assert_cast<VDirectInPredicate*>(filter_impl.get());
+ auto* direct_in_predicate =
static_cast<VDirectInPredicate*>(filter_impl.get());
int max_in_size =
_state->query_options().__isset.max_pushdown_conditions_per_column
@@ -1281,12 +1287,11 @@ Status OrcReader::_fill_partition_columns(
auto doris_column = block->get_by_name(kv.first).column;
auto* col_ptr = const_cast<IColumn*>(doris_column.get());
const auto& [value, slot_desc] = kv.second;
- auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
+ auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
Slice slice(value.data(), value.size());
uint64_t num_deserialized = 0;
- if (_text_serde->deserialize_column_from_fixed_json(*col_ptr, slice,
rows,
- &num_deserialized,
-
_text_formatOptions) != Status::OK()) {
+ if (text_serde->deserialize_column_from_fixed_json(*col_ptr, slice,
rows, &num_deserialized,
+
_text_formatOptions) != Status::OK()) {
return Status::InternalError("Failed to fill partition column:
{}={}",
slot_desc->col_name(), value);
}
@@ -1483,17 +1488,16 @@ Status OrcReader::_decode_string_column(const
std::string& col_name,
"Wrong data type for column '{}', expected
EncodedStringVectorBatch", col_name);
}
if (data->isEncoded) {
- return _decode_string_dict_encoded_column<is_filter>(col_name,
data_column, type_kind, data,
+ return _decode_string_dict_encoded_column<is_filter>(data_column,
type_kind, data,
num_values);
} else {
- return _decode_string_non_dict_encoded_column<is_filter>(col_name,
data_column, type_kind,
- data,
num_values);
+ return _decode_string_non_dict_encoded_column<is_filter>(data_column,
type_kind, data,
+ num_values);
}
}
template <bool is_filter>
-Status OrcReader::_decode_string_non_dict_encoded_column(const std::string&
col_name,
- const
MutableColumnPtr& data_column,
+Status OrcReader::_decode_string_non_dict_encoded_column(const
MutableColumnPtr& data_column,
const orc::TypeKind&
type_kind,
const
orc::EncodedStringVectorBatch* cvb,
size_t num_values) {
@@ -1547,105 +1551,72 @@ Status
OrcReader::_decode_string_non_dict_encoded_column(const std::string& col_
}
template <bool is_filter>
-Status OrcReader::_decode_string_dict_encoded_column(const std::string&
col_name,
- const MutableColumnPtr&
data_column,
+Status OrcReader::_decode_string_dict_encoded_column(const MutableColumnPtr&
data_column,
const orc::TypeKind&
type_kind,
const
orc::EncodedStringVectorBatch* cvb,
size_t num_values) {
std::vector<StringRef> string_values;
size_t max_value_length = 0;
string_values.reserve(num_values);
- UInt8* __restrict filter_data;
+
+ UInt8* __restrict filter_data = nullptr;
if constexpr (is_filter) {
filter_data = _filter->data();
}
+
+ auto process_one = [&]<bool is_char>(int i) {
+ if constexpr (is_filter) {
+ if (!filter_data[i]) {
+ string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
+ return;
+ }
+ }
+
+ char* val_ptr;
+ int64_t length;
+ cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr,
length);
+
+ if constexpr (is_char) {
+ length = trim_right(val_ptr, length);
+ }
+
+ if (length > max_value_length) {
+ max_value_length = length;
+ }
+
+ string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW, length);
+ };
+
if (type_kind == orc::TypeKind::CHAR) {
- // Possibly there are some zero padding characters in CHAR type, we
have to strip them off.
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
- if constexpr (is_filter) {
- if (!filter_data[i]) {
-
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
- continue;
- }
- }
- char* val_ptr;
- int64_t length;
- cvb->dictionary->getValueByIndex(cvb->index.data()[i],
val_ptr, length);
- length = trim_right(val_ptr, length);
- if (length > max_value_length) {
- max_value_length = length;
- }
- string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW,
- length);
+ process_one.template operator()<true>(i);
} else {
- // Orc doesn't fill null values in new batch, but the
former batch has been release.
- // Other types like int/long/timestamp... are flat types
without pointer in them,
- // so other types do not need to be handled separately
like string.
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
- if constexpr (is_filter) {
- if (!filter_data[i]) {
- string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW,
0);
- continue;
- }
- }
- char* val_ptr;
- int64_t length;
- cvb->dictionary->getValueByIndex(cvb->index.data()[i],
val_ptr, length);
- length = trim_right(val_ptr, length);
- if (length > max_value_length) {
- max_value_length = length;
- }
- string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW,
- length);
+ process_one.template operator()<true>(i);
}
}
} else {
if (cvb->hasNulls) {
for (int i = 0; i < num_values; ++i) {
if (cvb->notNull[i]) {
- if constexpr (is_filter) {
- if (!filter_data[i]) {
-
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
- continue;
- }
- }
- char* val_ptr;
- int64_t length;
- cvb->dictionary->getValueByIndex(cvb->index.data()[i],
val_ptr, length);
- if (length > max_value_length) {
- max_value_length = length;
- }
- string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW,
- length);
+ process_one.template operator()<false>(i);
} else {
string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
}
}
} else {
for (int i = 0; i < num_values; ++i) {
- if constexpr (is_filter) {
- if (!filter_data[i]) {
- string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW,
0);
- continue;
- }
- }
- char* val_ptr;
- int64_t length;
- cvb->dictionary->getValueByIndex(cvb->index.data()[i],
val_ptr, length);
- if (length > max_value_length) {
- max_value_length = length;
- }
- string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW,
- length);
+ process_one.template operator()<false>(i);
}
}
}
+
if (!string_values.empty()) {
data_column->insert_many_strings_overflow(string_values.data(),
string_values.size(),
max_value_length);
@@ -2789,14 +2760,7 @@ void ORCFileInputStream::beforeReadStripe(
if (_is_all_tiny_stripes) {
return;
}
- if (_file_reader != nullptr) {
- _file_reader->collect_profile_before_close();
- }
- for (const auto& stripe_stream : _stripe_streams) {
- if (stripe_stream != nullptr) {
- stripe_stream->collect_profile_before_close();
- }
- }
+ _collect_profile_before_close_file_stripe();
_stripe_streams.clear();
uint64_t offset = current_strip_information->getOffset();
@@ -2842,16 +2806,6 @@ void ORCFileInputStream::_build_input_stripe_streams(
void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
- std::vector<io::PrefetchRange> all_ranges;
- all_ranges.reserve(ranges.size());
- std::transform(ranges.begin(), ranges.end(),
std::back_inserter(all_ranges),
- [](const auto& pair) { return pair.second; });
- std::sort(all_ranges.begin(), all_ranges.end(),
- [](const auto& a, const auto& b) { return a.start_offset <
b.start_offset; });
-
- auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
- all_ranges, _orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
-
// Sort ranges by start_offset for efficient searching
std::vector<std::pair<orc::StreamId, io::PrefetchRange>>
sorted_ranges(ranges.begin(),
ranges.end());
@@ -2859,6 +2813,13 @@ void
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
return a.second.start_offset < b.second.start_offset;
});
+ std::vector<io::PrefetchRange> all_ranges;
+ all_ranges.reserve(ranges.size());
+ std::transform(sorted_ranges.begin(), sorted_ranges.end(),
std::back_inserter(all_ranges),
+ [](const auto& pair) { return pair.second; });
+ auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
+ all_ranges, _orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
+
for (const auto& merged_range : merged_ranges) {
auto merge_range_file_reader =
std::make_shared<OrcMergeRangeFileReader>(_profile,
_file_reader, merged_range);
@@ -2906,17 +2867,6 @@ void
ORCFileInputStream::_build_large_ranges_input_stripe_streams(
}
}
-void ORCFileInputStream::_collect_profile_before_close() {
- if (_file_reader != nullptr) {
- _file_reader->collect_profile_before_close();
- }
- for (const auto& stripe_stream : _stripe_streams) {
- if (stripe_stream != nullptr) {
- stripe_stream->collect_profile_before_close();
- }
- }
-}
-
void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter&
filter) {
if (_position_delete_ordered_rowids == nullptr) {
return;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 31555514005..9067323215e 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -148,7 +148,7 @@ public:
const std::string& ctz, io::IOContext* io_ctx, FileMetaCache*
meta_cache = nullptr,
bool enable_lazy_mat = true);
- ~OrcReader() override;
+ ~OrcReader() override = default;
//If you want to read the file by index instead of column name, set
hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
@@ -299,8 +299,12 @@ private:
static bool _check_acid_schema(const orc::Type& type);
// functions for building search argument until _init_search_argument
- std::tuple<bool, orc::Literal, orc::PredicateDataType> _make_orc_literal(
- const VSlotRef* slot_ref, const VLiteral* literal);
+ // Get predicate type from slot reference
+ std::pair<bool, orc::PredicateDataType> _get_orc_predicate_type(const
VSlotRef* slot_ref);
+
+ // Make ORC literal from Doris literal
+ std::pair<bool, orc::Literal> _make_orc_literal(const VSlotRef* slot_ref,
+ const VLiteral* literal);
bool _check_slot_can_push_down(const VExprSPtr& expr);
bool _check_literal_can_push_down(const VExprSPtr& expr, size_t child_id);
bool _check_rest_children_can_push_down(const VExprSPtr& expr);
@@ -550,15 +554,13 @@ private:
size_t num_values);
template <bool is_filter>
- Status _decode_string_non_dict_encoded_column(const std::string& col_name,
- const MutableColumnPtr&
data_column,
+ Status _decode_string_non_dict_encoded_column(const MutableColumnPtr&
data_column,
const orc::TypeKind&
type_kind,
const
orc::EncodedStringVectorBatch* cvb,
size_t num_values);
template <bool is_filter>
- Status _decode_string_dict_encoded_column(const std::string& col_name,
- const MutableColumnPtr&
data_column,
+ Status _decode_string_dict_encoded_column(const MutableColumnPtr&
data_column,
const orc::TypeKind& type_kind,
const
orc::EncodedStringVectorBatch* cvb,
size_t num_values);
@@ -688,7 +690,7 @@ private:
bool _disable_dict_filter = false;
// std::pair<col_name, slot_id>
std::vector<std::pair<std::string, int>> _dict_filter_cols;
- std::shared_ptr<ObjectPool> _obj_pool;
+ std::unique_ptr<ObjectPool> _obj_pool;
std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
bool _dict_cols_has_converted = false;
@@ -721,7 +723,7 @@ public:
StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
const io::IOContext* io_ctx, RuntimeProfile*
profile)
: _file_name(file_name),
- _inner_reader(inner_reader),
+ _inner_reader(std::move(inner_reader)),
_io_ctx(io_ctx),
_profile(profile) {}
@@ -741,11 +743,6 @@ public:
RuntimeProfile* profile() const { return _profile; }
- void beforeReadStripe(
- std::unique_ptr<orc::StripeInformation> current_strip_information,
- const std::vector<bool>& selected_columns,
- std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) override {}
-
protected:
void _collect_profile_at_runtime() override {};
void _collect_profile_before_close() override {
@@ -778,17 +775,7 @@ public:
_io_ctx(io_ctx),
_profile(profile) {}
- ~ORCFileInputStream() override {
- if (_tracing_file_reader != nullptr) {
- _tracing_file_reader->collect_profile_before_close();
- }
- for (const auto& stripe_stream : _stripe_streams) {
- if (stripe_stream != nullptr) {
- stripe_stream->collect_profile_before_close();
- }
- }
- _stripe_streams.clear();
- }
+ ~ORCFileInputStream() override {
_collect_profile_before_close_file_stripe(); }
uint64_t getLength() const override { return _tracing_file_reader->size();
}
@@ -813,7 +800,18 @@ public:
protected:
void _collect_profile_at_runtime() override {};
- void _collect_profile_before_close() override;
+ void _collect_profile_before_close() override {
_collect_profile_before_close_file_stripe(); }
+
+ void _collect_profile_before_close_file_stripe() {
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
+ for (const auto& stripe_stream : _stripe_streams) {
+ if (stripe_stream != nullptr) {
+ stripe_stream->collect_profile_before_close();
+ }
+ }
+ }
private:
void _build_input_stripe_streams(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]