This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7d90d8a8fc7063d987c952dba9c1ee5debbdd41f Author: Jibing-Li <[email protected]> AuthorDate: Tue Feb 14 14:32:27 2023 +0800 [Fix](multi catalog)Support read hive1.x orc file. (#16677) Hive 1.x may write orc file with internal column name (_col0, _col1, _col2...). This will cause query result be NULL because column name in orc file doesn't match with column name in Doris table schema. This pr is to support query Hive orc files with internal column names. For now, we haven't see any problem in Parquet file, will send new pr to fix parquet if any problem show up in the future. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 25 +++++++++++++++++++--- be/src/vec/exec/format/orc/vorc_reader.h | 6 ++++++ .../planner/external/ExternalFileScanNode.java | 22 +++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 2 ++ 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index f313cb60f0..5d9ae9d63c 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -76,7 +76,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params _range_start_offset(range.start_offset), _range_size(range.size), _ctz(ctz), - _column_names(column_names) { + _column_names(column_names), + _is_hive(params.__isset.slot_name_to_schema_pos) { TimezoneUtils::find_cctz_time_zone(ctz, _time_zone); _init_profile(); } @@ -87,7 +88,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _scan_params(params), _scan_range(range), _ctz(ctz), - _column_names(column_names) {} + _column_names(column_names), + _is_hive(params.__isset.slot_name_to_schema_pos) {} OrcReader::~OrcReader() { close(); @@ -170,7 +172,13 @@ Status OrcReader::init_reader( auto& selected_type = _row_reader->getSelectedType(); _col_orc_type.resize(selected_type.getSubtypeCount()); for (int i = 0; i < selected_type.getSubtypeCount(); ++i) { - _colname_to_idx[_get_field_name_lower_case(&selected_type, i)] = i; + auto name = _get_field_name_lower_case(&selected_type, i); + // For hive engine, translate the column name in orc file to schema column name. + // This is for Hive 1.x which use internal column name _col0, _col1... + if (_is_hive) { + name = _file_col_to_schema_col[name]; + } + _colname_to_idx[name] = i; _col_orc_type[i] = selected_type.getSubtype(i); } return Status::OK(); @@ -219,6 +227,12 @@ Status OrcReader::_init_read_columns() { orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i)); } for (auto& col_name : _column_names) { + if (_is_hive) { + auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); + DCHECK(iter != _scan_params.slot_name_to_schema_pos.end()); + int pos = iter->second; + orc_cols_lower_case[pos] = iter->first; + } auto iter = std::find(orc_cols_lower_case.begin(), orc_cols_lower_case.end(), col_name); if (iter == orc_cols_lower_case.end()) { _missing_cols.emplace_back(col_name); @@ -226,6 +240,11 @@ Status OrcReader::_init_read_columns() { int pos = std::distance(orc_cols_lower_case.begin(), iter); _read_cols.emplace_back(orc_cols[pos]); _read_cols_lower_case.emplace_back(col_name); + // For hive engine, store the orc column name to schema column name map. + // This is for Hive 1.x orc file with internal column name _col0, _col1... + if (_is_hive) { + _file_col_to_schema_col[orc_cols[pos]] = col_name; + } } } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index aa3efa8d2c..53f4a91880 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -274,6 +274,12 @@ private: std::list<std::string> _read_cols_lower_case; std::list<std::string> _missing_cols; std::unordered_map<std::string, int> _colname_to_idx; + // Column name in Orc file to column name to schema. + // This is used for Hive 1.x which use internal column name in Orc file. + // _col0, _col1... + std::unordered_map<std::string, std::string> _file_col_to_schema_col; + // Flag for hive engine. True if the external table engine is Hive. + bool _is_hive = false; std::vector<const orc::Type*> _col_orc_type; ORCFileInputStream* _file_reader = nullptr; Statistics _statistics; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 741e11362b..b0a0c162e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -338,6 +338,12 @@ public class ExternalFileScanNode extends ExternalScanNode { createScanRangeLocations(context, scanProvider); this.inputSplitsNum += scanProvider.getInputSplitNum(); this.totalFileSize += scanProvider.getInputFileSize(); + TableIf table = desc.getTable(); + if (table instanceof HMSExternalTable) { + if (((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) { + genSlotToSchemaIdMap(context); + } + } if (scanProvider instanceof HiveScanProvider) { this.totalPartitionNum = ((HiveScanProvider) scanProvider).getTotalPartitionNum(); this.readPartitionNum = ((HiveScanProvider) scanProvider).getReadPartitionNum(); @@ -552,6 +558,22 @@ public class ExternalFileScanNode extends ExternalScanNode { scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations); } + private void genSlotToSchemaIdMap(ParamCreateContext context) { + List<Column> baseSchema = desc.getTable().getBaseSchema(); + Map<String, Integer> columnNameToPosition = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + int idx = 0; + for (Column col : baseSchema) { + if (col.getName().equals(slot.getColumn().getName())) { + columnNameToPosition.put(col.getName(), idx); + break; + } + idx += 1; + } + } + context.params.setSlotNameToSchemaPos(columnNameToPosition); + } + @Override public int getNumInstances() { return scanRangeLocations.size(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index f01b0f6144..9c76eacee9 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -321,6 +321,8 @@ struct TFileScanRangeParams { 17: optional TTableFormatFileDesc table_format_params // For csv query task, same the column index in file, order by dest_tuple 18: optional list<i32> column_idxs + // Map of slot to its position in table schema. Only for Hive external table. + 19: optional map<string, i32> slot_name_to_schema_pos } struct TFileRangeDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
