This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7d90d8a8fc7063d987c952dba9c1ee5debbdd41f
Author: Jibing-Li <[email protected]>
AuthorDate: Tue Feb 14 14:32:27 2023 +0800

    [Fix](multi catalog)Support read hive1.x orc file. (#16677)
    
    Hive 1.x may write orc file with internal column name (_col0, _col1, 
_col2...).
    This will cause query result be NULL because column name in orc file 
doesn't match
    with column name in Doris table schema. This pr is to support query Hive 
orc files with internal column names.
    
    For now, we haven't see any problem in Parquet file, will send new pr to 
fix parquet if any problem show up in the future.
---
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 25 +++++++++++++++++++---
 be/src/vec/exec/format/orc/vorc_reader.h           |  6 ++++++
 .../planner/external/ExternalFileScanNode.java     | 22 +++++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |  2 ++
 4 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index f313cb60f0..5d9ae9d63c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -76,7 +76,8 @@ OrcReader::OrcReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz),
-          _column_names(column_names) {
+          _column_names(column_names),
+          _is_hive(params.__isset.slot_name_to_schema_pos) {
     TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
     _init_profile();
 }
@@ -87,7 +88,8 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
           _scan_params(params),
           _scan_range(range),
           _ctz(ctz),
-          _column_names(column_names) {}
+          _column_names(column_names),
+          _is_hive(params.__isset.slot_name_to_schema_pos) {}
 
 OrcReader::~OrcReader() {
     close();
@@ -170,7 +172,13 @@ Status OrcReader::init_reader(
     auto& selected_type = _row_reader->getSelectedType();
     _col_orc_type.resize(selected_type.getSubtypeCount());
     for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
-        _colname_to_idx[_get_field_name_lower_case(&selected_type, i)] = i;
+        auto name = _get_field_name_lower_case(&selected_type, i);
+        // For hive engine, translate the column name in orc file to schema 
column name.
+        // This is for Hive 1.x which use internal column name _col0, _col1...
+        if (_is_hive) {
+            name = _file_col_to_schema_col[name];
+        }
+        _colname_to_idx[name] = i;
         _col_orc_type[i] = selected_type.getSubtype(i);
     }
     return Status::OK();
@@ -219,6 +227,12 @@ Status OrcReader::_init_read_columns() {
         
orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i));
     }
     for (auto& col_name : _column_names) {
+        if (_is_hive) {
+            auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
+            DCHECK(iter != _scan_params.slot_name_to_schema_pos.end());
+            int pos = iter->second;
+            orc_cols_lower_case[pos] = iter->first;
+        }
         auto iter = std::find(orc_cols_lower_case.begin(), 
orc_cols_lower_case.end(), col_name);
         if (iter == orc_cols_lower_case.end()) {
             _missing_cols.emplace_back(col_name);
@@ -226,6 +240,11 @@ Status OrcReader::_init_read_columns() {
             int pos = std::distance(orc_cols_lower_case.begin(), iter);
             _read_cols.emplace_back(orc_cols[pos]);
             _read_cols_lower_case.emplace_back(col_name);
+            // For hive engine, store the orc column name to schema column 
name map.
+            // This is for Hive 1.x orc file with internal column name _col0, 
_col1...
+            if (_is_hive) {
+                _file_col_to_schema_col[orc_cols[pos]] = col_name;
+            }
         }
     }
     return Status::OK();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index aa3efa8d2c..53f4a91880 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -274,6 +274,12 @@ private:
     std::list<std::string> _read_cols_lower_case;
     std::list<std::string> _missing_cols;
     std::unordered_map<std::string, int> _colname_to_idx;
+    // Column name in Orc file to column name to schema.
+    // This is used for Hive 1.x which use internal column name in Orc file.
+    // _col0, _col1...
+    std::unordered_map<std::string, std::string> _file_col_to_schema_col;
+    // Flag for hive engine. True if the external table engine is Hive.
+    bool _is_hive = false;
     std::vector<const orc::Type*> _col_orc_type;
     ORCFileInputStream* _file_reader = nullptr;
     Statistics _statistics;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 741e11362b..b0a0c162e6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -338,6 +338,12 @@ public class ExternalFileScanNode extends ExternalScanNode 
{
             createScanRangeLocations(context, scanProvider);
             this.inputSplitsNum += scanProvider.getInputSplitNum();
             this.totalFileSize += scanProvider.getInputFileSize();
+            TableIf table = desc.getTable();
+            if (table instanceof HMSExternalTable) {
+                if (((HMSExternalTable) 
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
+                    genSlotToSchemaIdMap(context);
+                }
+            }
             if (scanProvider instanceof HiveScanProvider) {
                 this.totalPartitionNum = ((HiveScanProvider) 
scanProvider).getTotalPartitionNum();
                 this.readPartitionNum = ((HiveScanProvider) 
scanProvider).getReadPartitionNum();
@@ -552,6 +558,22 @@ public class ExternalFileScanNode extends ExternalScanNode 
{
         scanProvider.createScanRangeLocations(context, backendPolicy, 
scanRangeLocations);
     }
 
+    private void genSlotToSchemaIdMap(ParamCreateContext context) {
+        List<Column> baseSchema = desc.getTable().getBaseSchema();
+        Map<String, Integer> columnNameToPosition = Maps.newHashMap();
+        for (SlotDescriptor slot : desc.getSlots()) {
+            int idx = 0;
+            for (Column col : baseSchema) {
+                if (col.getName().equals(slot.getColumn().getName())) {
+                    columnNameToPosition.put(col.getName(), idx);
+                    break;
+                }
+                idx += 1;
+            }
+        }
+        context.params.setSlotNameToSchemaPos(columnNameToPosition);
+    }
+
     @Override
     public int getNumInstances() {
         return scanRangeLocations.size();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f01b0f6144..9c76eacee9 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -321,6 +321,8 @@ struct TFileScanRangeParams {
     17: optional TTableFormatFileDesc table_format_params
     // For csv query task, same the column index in file, order by dest_tuple
     18: optional list<i32> column_idxs
+    // Map of slot to its position in table schema. Only for Hive external 
table.
+    19: optional map<string, i32> slot_name_to_schema_pos
 }
 
 struct TFileRangeDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to