This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2adb0fcc50b [opt](hive) support orc generated from hive 1.x for all
file scan node (#28806)
2adb0fcc50b is described below
commit 2adb0fcc50b9bc7fdccb9685a34abbc438f8640a
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Jan 6 17:33:16 2024 +0800
[opt](hive) support orc generated from hive 1.x for all file scan node
(#28806)
---
be/src/vec/exec/format/orc/vorc_reader.cpp | 24 ++++++++++++++--------
be/src/vec/exec/format/orc/vorc_reader.h | 24 +++++++++++++++++++---
.../doris/planner/external/FileQueryScanNode.java | 22 ++++++++++++++++++++
.../doris/planner/external/HiveScanNode.java | 23 ---------------------
4 files changed, 59 insertions(+), 34 deletions(-)
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 943b9d1e4d6..17a3425f4af 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -146,7 +146,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState*
state,
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
- _is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_is_dict_cols_converted(false) {
@@ -165,7 +164,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_scan_params(params),
_scan_range(range),
_ctz(ctz),
- _is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
@@ -307,11 +305,15 @@ Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
std::vector<std::string> orc_cols;
std::vector<std::string> orc_cols_lower_case;
- _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
+ bool is_hive1_orc = false;
+ _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map,
&is_hive1_orc);
+ // In old version slot_name_to_schema_pos may not be set in _scan_params
+ // TODO, should be removed in 2.2 or later
+ _is_hive1_orc = is_hive1_orc &&
_scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
- if (_is_hive) {
+ if (_is_hive1_orc) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
@@ -346,7 +348,7 @@ Status OrcReader::_init_read_columns() {
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column
name map.
// This is for Hive 1.x orc file with internal column name _col0,
_col1...
- if (_is_hive) {
+ if (_is_hive1_orc) {
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] =
col_name;
}
_col_name_to_file_col_name[col_name] = read_col;
@@ -357,20 +359,26 @@ Status OrcReader::_init_read_columns() {
void OrcReader::_init_orc_cols(const orc::Type& type,
std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
- std::unordered_map<std::string, const
orc::Type*>& type_map) {
+ std::unordered_map<std::string, const
orc::Type*>& type_map,
+ bool* is_hive1_orc) {
+ bool hive1_orc = true;
for (int i = 0; i < type.getSubtypeCount(); ++i) {
orc_cols.emplace_back(type.getFieldName(i));
auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
+ if (hive1_orc) {
+ hive1_orc = _is_hive1_col_name(filed_name_lower_case);
+ }
auto filed_name_lower_case_copy = filed_name_lower_case;
orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
type_map.emplace(std::move(filed_name_lower_case_copy),
type.getSubtype(i));
if (_is_acid) {
const orc::Type* sub_type = type.getSubtype(i);
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
- _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case,
type_map);
+ _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case,
type_map, is_hive1_orc);
}
}
}
+ *is_hive1_orc = hive1_orc;
}
bool OrcReader::_check_acid_schema(const orc::Type& type) {
@@ -845,7 +853,7 @@ Status OrcReader::_init_select_types(const orc::Type& type,
int idx) {
std::string name;
// For hive engine, translate the column name in orc file to schema
column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
- if (_is_hive) {
+ if (_is_hive1_orc) {
name =
_removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
} else {
name = _get_field_name_lower_case(&type, i);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 366231deaee..b8bc05387b9 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -247,7 +247,8 @@ private:
Status _init_read_columns();
void _init_orc_cols(const orc::Type& type, std::vector<std::string>&
orc_cols,
std::vector<std::string>& orc_cols_lower_case,
- std::unordered_map<std::string, const orc::Type*>&
type_map);
+ std::unordered_map<std::string, const orc::Type*>&
type_map,
+ bool* is_hive1_orc);
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
@@ -483,6 +484,22 @@ private:
int64_t get_remaining_rows() { return _remaining_rows; }
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
+ // check if the given name is like _col0, _col1, ...
+ static bool inline _is_hive1_col_name(const std::string& name) {
+ if (name.size() <= 4) {
+ return false;
+ }
+ if (name.substr(0, 4) != "_col") {
+ return false;
+ }
+ for (size_t i = 4; i < name.size(); ++i) {
+ if (!isdigit(name[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private:
// This is only for count(*) short circuit read.
// save the total number of rows in range
@@ -509,8 +526,9 @@ private:
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string>
_removed_acid_file_col_name_to_schema_col;
- // Flag for hive engine. True if the external table engine is Hive.
- bool _is_hive = false;
+ // Flag for hive engine. True if the external table engine is Hive1.x with
orc col name
+ // as _col1, col2, ...
+ bool _is_hive1_orc = false;
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index b54b05f47e1..71c731498a6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -275,6 +275,9 @@ public abstract class FileQueryScanNode extends
FileScanNode {
return;
}
TFileFormatType fileFormatType = getFileFormatType();
+ if (fileFormatType == TFileFormatType.FORMAT_ORC) {
+ genSlotToSchemaIdMapForOrc();
+ }
params.setFormatType(fileFormatType);
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) ||
fileFormatType == TFileFormatType.FORMAT_JSON;
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
@@ -463,6 +466,25 @@ public abstract class FileQueryScanNode extends
FileScanNode {
return rangeDesc;
}
+ // To Support Hive 1.x orc internal column name like (_col0, _col1,
_col2...)
+ // We need to save mapping from slot name to schema position
+ protected void genSlotToSchemaIdMapForOrc() {
+ Preconditions.checkNotNull(params);
+ List<Column> baseSchema = desc.getTable().getBaseSchema();
+ Map<String, Integer> columnNameToPosition = Maps.newHashMap();
+ for (SlotDescriptor slot : desc.getSlots()) {
+ int idx = 0;
+ for (Column col : baseSchema) {
+ if (col.getName().equals(slot.getColumn().getName())) {
+ columnNameToPosition.put(col.getName(), idx);
+ break;
+ }
+ idx += 1;
+ }
+ }
+ params.setSlotNameToSchemaPos(columnNameToPosition);
+ }
+
protected abstract TFileType getLocationType() throws UserException;
protected abstract TFileType getLocationType(String location) throws
UserException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index f26db2b9fb7..b540cd67c56 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.FunctionCallExpr;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
@@ -39,7 +38,6 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveTransaction;
-import org.apache.doris.datasource.hive.HiveVersionUtil;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
@@ -55,7 +53,6 @@ import org.apache.doris.thrift.TFileType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -117,9 +114,6 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
super.doInitialize();
- if (HiveVersionUtil.isHive1(hmsTable.getHiveVersion())) {
- genSlotToSchemaIdMap();
- }
if (hmsTable.isHiveTransactionalTable()) {
this.hiveTransaction = new
HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
@@ -396,23 +390,6 @@ public class HiveScanNode extends FileQueryScanNode {
return fileAttributes;
}
- // To Support Hive 1.x orc internal column name like (_col0, _col1,
_col2...)
- private void genSlotToSchemaIdMap() {
- List<Column> baseSchema = desc.getTable().getBaseSchema();
- Map<String, Integer> columnNameToPosition = Maps.newHashMap();
- for (SlotDescriptor slot : desc.getSlots()) {
- int idx = 0;
- for (Column col : baseSchema) {
- if (col.getName().equals(slot.getColumn().getName())) {
- columnNameToPosition.put(col.getName(), idx);
- break;
- }
- idx += 1;
- }
- }
- params.setSlotNameToSchemaPos(columnNameToPosition);
- }
-
@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]