Gabriel39 commented on code in PR #64854:
URL: https://github.com/apache/doris/pull/64854#discussion_r3503779615
##########
be/src/format/orc/vorc_reader.cpp:
##########
@@ -1719,458 +1710,70 @@ static inline size_t trim_right(const char* s, size_t
size) {
return size;
}
-template <bool is_filter>
-Status OrcReader::_decode_string_column(const std::string& col_name,
- const MutableColumnPtr& data_column,
- const orc::TypeKind& type_kind,
- const orc::ColumnVectorBatch* cvb,
size_t num_values) {
- SCOPED_RAW_TIMER(&_statistics.decode_value_time);
- const auto* data = dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb);
- if (data == nullptr) {
- return Status::InternalError(
- "Wrong data type for column '{}', expected
EncodedStringVectorBatch", col_name);
- }
- if (data->isEncoded) {
- return _decode_string_dict_encoded_column<is_filter>(data_column,
type_kind, data,
- num_values);
- } else {
- return _decode_string_non_dict_encoded_column<is_filter>(data_column,
type_kind, data,
- num_values);
- }
-}
-
-template <bool is_filter>
-Status OrcReader::_decode_string_non_dict_encoded_column(const
MutableColumnPtr& data_column,
- const orc::TypeKind&
type_kind,
- const
orc::EncodedStringVectorBatch* cvb,
- size_t num_values) {
- const static std::string empty_string;
- std::vector<StringRef> string_values;
- string_values.reserve(num_values);
- if (type_kind == orc::TypeKind::CHAR) {
- // Possibly there are some zero padding characters in CHAR type, we
have to strip them off.
- if (cvb->hasNulls) {
- for (int i = 0; i < num_values; ++i) {
- if (cvb->notNull[i]) {
- size_t length = trim_right(cvb->data[i], cvb->length[i]);
- string_values.emplace_back((length > 0) ? cvb->data[i] :
empty_string.data(),
- length);
- } else {
- // Orc doesn't fill null values in new batch, but the
former batch has been release.
- // Other types like int/long/timestamp... are flat types
without pointer in them,
- // so other types do not need to be handled separately
like string.
- string_values.emplace_back(empty_string.data(), 0);
- }
- }
- } else {
- for (int i = 0; i < num_values; ++i) {
- size_t length = trim_right(cvb->data[i], cvb->length[i]);
- string_values.emplace_back((length > 0) ? cvb->data[i] :
empty_string.data(),
- length);
- }
- }
- } else {
- if (cvb->hasNulls) {
- for (int i = 0; i < num_values; ++i) {
- if (cvb->notNull[i]) {
- string_values.emplace_back(
- (cvb->length[i] > 0) ? cvb->data[i] :
empty_string.data(),
- cvb->length[i]);
- } else {
- string_values.emplace_back(empty_string.data(), 0);
- }
- }
- } else {
- for (int i = 0; i < num_values; ++i) {
- string_values.emplace_back(
- (cvb->length[i] > 0) ? cvb->data[i] :
empty_string.data(), cvb->length[i]);
- }
- }
- }
- if (!string_values.empty()) {
- data_column->insert_many_strings(string_values.data(), num_values);
- }
- return Status::OK();
-}
-
-template <bool is_filter>
-Status OrcReader::_decode_string_dict_encoded_column(const MutableColumnPtr&
data_column,
- const orc::TypeKind&
type_kind,
- const
orc::EncodedStringVectorBatch* cvb,
- size_t num_values) {
- std::vector<StringRef> string_values;
- size_t max_value_length = 0;
- string_values.reserve(num_values);
-
- UInt8* __restrict filter_data = nullptr;
- if constexpr (is_filter) {
- filter_data = _filter->data();
- }
-
- auto process_one = [&]<bool is_char>(int i) {
- if constexpr (is_filter) {
- if (!filter_data[i]) {
- string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
- return;
- }
- }
-
- char* val_ptr;
- int64_t length;
- cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr,
length);
-
- if constexpr (is_char) {
- length = trim_right(val_ptr, length);
- }
-
- if (length > max_value_length) {
- max_value_length = length;
- }
-
- string_values.emplace_back((length > 0) ? val_ptr :
EMPTY_STRING_FOR_OVERFLOW, length);
- };
-
- if (type_kind == orc::TypeKind::CHAR) {
- if (cvb->hasNulls) {
- for (int i = 0; i < num_values; ++i) {
- if (cvb->notNull[i]) {
- process_one.template operator()<true>(i);
- } else {
- string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
- }
- }
- } else {
- for (int i = 0; i < num_values; ++i) {
- process_one.template operator()<true>(i);
- }
- }
- } else {
- if (cvb->hasNulls) {
- for (int i = 0; i < num_values; ++i) {
- if (cvb->notNull[i]) {
- process_one.template operator()<false>(i);
- } else {
- string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
- }
- }
- } else {
- for (int i = 0; i < num_values; ++i) {
- process_one.template operator()<false>(i);
- }
- }
- }
-
- if (!string_values.empty()) {
- data_column->insert_many_strings_overflow(string_values.data(),
string_values.size(),
- max_value_length);
- }
- return Status::OK();
-}
-
-template <bool is_filter>
-Status OrcReader::_decode_int32_column(const std::string& col_name,
- const MutableColumnPtr& data_column,
- const orc::ColumnVectorBatch* cvb,
size_t num_values) {
- SCOPED_RAW_TIMER(&_statistics.decode_value_time);
- if (dynamic_cast<const orc::LongVectorBatch*>(cvb) != nullptr) {
- return _decode_flat_column<TYPE_INT, orc::LongVectorBatch>(col_name,
data_column, cvb,
- num_values);
- } else if (dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb) !=
nullptr) {
- const auto* data = static_cast<const
orc::EncodedStringVectorBatch*>(cvb);
- const auto* cvb_data = data->index.data();
- auto& column_data = static_cast<ColumnInt32&>(*data_column).get_data();
- auto origin_size = column_data.size();
- column_data.resize(origin_size + num_values);
- for (int i = 0; i < num_values; ++i) {
- column_data[origin_size + i] = (Int32)cvb_data[i];
- }
- return Status::OK();
- } else {
- DCHECK(false) << "Bad ColumnVectorBatch type.";
- return Status::InternalError("Bad ColumnVectorBatch type.");
- }
-}
-
-Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
- ColumnArray::Offsets64&
doris_offsets,
- const orc::DataBuffer<int64_t>&
orc_offsets,
- size_t num_values, size_t*
element_size) {
- SCOPED_RAW_TIMER(&_statistics.decode_value_time);
- if (num_values > 0) {
- // The const variable uses a non-const method from a third-party
dependency
- // without modification, so const_cast can be used.
- if (const_cast<orc::DataBuffer<int64_t>&>(orc_offsets).size() <
num_values + 1) {
- return Status::InternalError("Wrong array offsets in orc file for
column '{}'",
- col_name);
- }
- auto prev_offset = doris_offsets.back();
- auto base_offset = orc_offsets[0];
- for (int i = 1; i < num_values + 1; ++i) {
- doris_offsets.emplace_back(prev_offset + orc_offsets[i] -
base_offset);
- }
- *element_size = orc_offsets[num_values] - base_offset;
- } else {
- *element_size = 0;
- }
- return Status::OK();
-}
-
template <bool is_filter>
Status OrcReader::_fill_doris_data_column(const std::string& col_name,
MutableColumnPtr& data_column,
const DataTypePtr& data_type,
std::shared_ptr<TableSchemaChangeHelper::Node> root_node,
const orc::Type* orc_column_type,
const orc::ColumnVectorBatch* cvb,
size_t num_values) {
- auto logical_type = data_type->get_primitive_type();
-
- switch (logical_type) {
-#define DISPATCH(FlatType, CppType, OrcColumnType) \
- case FlatType: \
- return _decode_flat_column<FlatType, OrcColumnType>(col_name,
data_column, cvb, num_values);
- FOR_FLAT_ORC_COLUMNS(DISPATCH)
-#undef DISPATCH
- case PrimitiveType::TYPE_INT:
- return _decode_int32_column<is_filter>(col_name, data_column, cvb,
num_values);
- case PrimitiveType::TYPE_DECIMAL32:
- return _decode_decimal_column<TYPE_DECIMAL32, is_filter>(col_name,
data_column, data_type,
- cvb,
num_values);
- case PrimitiveType::TYPE_DECIMAL64:
- return _decode_decimal_column<TYPE_DECIMAL64, is_filter>(col_name,
data_column, data_type,
- cvb,
num_values);
- case PrimitiveType::TYPE_DECIMALV2:
- return _decode_decimal_column<TYPE_DECIMALV2, is_filter>(col_name,
data_column, data_type,
- cvb,
num_values);
- case PrimitiveType::TYPE_DECIMAL128I:
- return _decode_decimal_column<TYPE_DECIMAL128I, is_filter>(col_name,
data_column, data_type,
- cvb,
num_values);
- case PrimitiveType::TYPE_DATEV2:
- return _decode_time_column<DateV2Value<DateV2ValueType>, TYPE_DATEV2,
orc::LongVectorBatch,
- is_filter>(col_name, data_column, cvb,
num_values);
- case PrimitiveType::TYPE_DATETIMEV2:
- return _decode_time_column<DateV2Value<DateTimeV2ValueType>,
TYPE_DATETIMEV2,
- orc::TimestampVectorBatch,
is_filter>(col_name, data_column, cvb,
-
num_values);
- case PrimitiveType::TYPE_STRING:
- case PrimitiveType::TYPE_VARCHAR:
- case PrimitiveType::TYPE_CHAR:
- return _decode_string_column<is_filter>(col_name, data_column,
orc_column_type->getKind(),
- cvb, num_values);
- case PrimitiveType::TYPE_VARBINARY:
- // case BINARY: binary type still use StringVectorBatch, so here we
just call _decode_string_column
- // return encoded ?
std::make_unique<EncodedStringVectorBatch>(capacity, memoryPool)
- // : std::make_unique<StringVectorBatch>(capacity,
memoryPool);
- return _decode_string_column<is_filter>(col_name, data_column,
orc_column_type->getKind(),
- cvb, num_values);
- case PrimitiveType::TYPE_TIMESTAMPTZ:
- return _decode_timestamp_tz_column<is_filter>(col_name, data_column,
cvb, num_values);
- case PrimitiveType::TYPE_ARRAY: {
- if (orc_column_type->getKind() != orc::TypeKind::LIST) {
- return Status::InternalError(
- "Wrong data type for column '{}', expected list, actual
{}", col_name,
- orc_column_type->getKind());
- }
- const auto* orc_list = dynamic_cast<const orc::ListVectorBatch*>(cvb);
- auto& doris_offsets =
static_cast<ColumnArray&>(*data_column).get_offsets();
- const auto& orc_offsets = orc_list->offsets;
- size_t element_size = 0;
- RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_offsets,
orc_offsets, num_values,
- &element_size));
- const DataTypePtr& nested_type =
- reinterpret_cast<const
DataTypeArray*>(remove_nullable(data_type).get())
- ->get_nested_type();
- const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
- std::string element_name = col_name + ".element";
- return _orc_column_to_doris_column<false>(
- element_name,
static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
- root_node->get_element_node(), nested_orc_type,
orc_list->elements.get(),
- element_size);
- }
- case PrimitiveType::TYPE_MAP: {
- if (orc_column_type->getKind() != orc::TypeKind::MAP) {
- return Status::InternalError("Wrong data type for column '{}',
expected map, actual {}",
- col_name, orc_column_type->getKind());
- }
- const auto* orc_map = dynamic_cast<const orc::MapVectorBatch*>(cvb);
- auto& doris_map = static_cast<ColumnMap&>(*data_column);
- size_t element_size = 0;
- RETURN_IF_ERROR(_fill_doris_array_offsets(col_name,
doris_map.get_offsets(),
- orc_map->offsets,
num_values, &element_size));
- const DataTypePtr& doris_key_type =
- reinterpret_cast<const
DataTypeMap*>(remove_nullable(data_type).get())
- ->get_key_type();
- const DataTypePtr& doris_value_type =
- reinterpret_cast<const
DataTypeMap*>(remove_nullable(data_type).get())
- ->get_value_type();
-
- // Get ORC key and value types with null checks
- const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
- const orc::Type* orc_value_type = orc_column_type->getSubtype(1);
-
- VLOG_DEBUG << "[OrcReader] MAP column '" << col_name
- << "': orc_key_type=" << (orc_key_type != nullptr ? "not
null" : "NULL")
- << ", orc_value_type=" << (orc_value_type != nullptr ? "not
null" : "NULL")
- << ", element_size=" << element_size;
-
- // Handle incomplete MAP type - if key or value type is nullptr, try
to recover from mapping
- bool key_is_missing = (orc_key_type == nullptr);
- bool value_is_missing = (orc_value_type == nullptr);
-
- if (key_is_missing || value_is_missing) {
- VLOG_DEBUG << "[OrcReader] Detected incomplete MAP subtypes for
column '" << col_name
- << "', attempting to recover from mapping...";
-
- uint64_t column_id = orc_column_type->getColumnId();
- auto it = _column_id_to_file_type.find(column_id);
- if (it != _column_id_to_file_type.end() && it->second != nullptr) {
- const orc::Type* complete_map_type = it->second;
- if (complete_map_type->getKind() == orc::TypeKind::MAP &&
- complete_map_type->getSubtypeCount() == 2) {
- if (key_is_missing) {
- orc_key_type = complete_map_type->getSubtype(0);
- if (orc_key_type != nullptr) {
- // key_is_missing = false;
- VLOG_DEBUG << "[OrcReader] Recovered key type from
mapping for column '"
- << col_name << "'";
- }
- }
- if (value_is_missing) {
- orc_value_type = complete_map_type->getSubtype(1);
- if (orc_value_type != nullptr) {
- // value_is_missing = false;
- VLOG_DEBUG
- << "[OrcReader] Recovered value type from
mapping for column '"
- << col_name << "'";
- }
+ OrcSerDeReadContext context;
+ context.timezone = _ctz;
+ context.decode_value_time = &_statistics.decode_value_time;
+ if constexpr (is_filter) {
+ context.filter = _filter->data();
+ }
+ context.schema_node =
std::static_pointer_cast<OrcSerDeReadContext::SchemaNode>(root_node);
+ context.resolve_file_type = [this](uint64_t column_id) -> const orc::Type*
{
+ auto it = _column_id_to_file_type.find(column_id);
+ return it == _column_id_to_file_type.end() ? nullptr : it->second;
+ };
+ context.get_element_node = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node) {
+ return std::static_pointer_cast<OrcSerDeReadContext::SchemaNode>(
+
std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)->get_element_node());
+ };
+ context.get_key_node = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node) {
+ return std::static_pointer_cast<OrcSerDeReadContext::SchemaNode>(
+
std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)->get_key_node());
+ };
+ context.get_value_node = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node) {
+ return std::static_pointer_cast<OrcSerDeReadContext::SchemaNode>(
+
std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)->get_value_node());
+ };
+ context.get_child_node = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node,
+ const std::string& child_name) {
+ return std::static_pointer_cast<OrcSerDeReadContext::SchemaNode>(
+
std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)->get_children_node(
+ child_name));
+ };
+ context.get_child_file_name = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node,
+ const std::string& child_name) {
+ return std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)
+ ->children_file_column_name(child_name);
+ };
+ context.child_exists = [](const
std::shared_ptr<OrcSerDeReadContext::SchemaNode>& node,
+ const std::string& child_name) {
+ return std::static_pointer_cast<TableSchemaChangeHelper::Node>(node)
+ ->children_column_exists(child_name);
+ };
+ context.read_nested_column =
+ [this](const std::string& name, ColumnPtr& column, const
DataTypePtr& type,
+ const std::shared_ptr<OrcSerDeReadContext::SchemaNode>&
node,
+ const orc::Type* orc_type, const orc::ColumnVectorBatch*
batch, size_t rows,
+ bool inherit_filter) {
+ auto schema_node =
std::static_pointer_cast<TableSchemaChangeHelper::Node>(node);
+ if (inherit_filter) {
+ if constexpr (is_filter) {
+ return _orc_column_to_doris_column<true>(name, column,
type, schema_node,
Review Comment:
这里为什么是_orc_column_to_doris_column而不是直接调用子列的datatype的serde?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]