This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-seq_rc_file
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-seq_rc_file by this 
push:
     new 69802005dcd [Feature](tvf) Support using tvf to read 
sequence_file/rc_file in local/hdfs/s3 (#41080)
69802005dcd is described below

commit 69802005dcded40dbbab77d071c69055149c9c0d
Author: 星が降らない街 <[email protected]>
AuthorDate: Mon Sep 23 16:13:32 2024 +0800

    [Feature](tvf) Support using tvf to read sequence_file/rc_file in 
local/hdfs/s3 (#41080)
    
    ## Proposed changes
    
    Issue Number: #30669
    
    <!--Describe your changes.-->
    
    This change supports reading the contents of external file tables from
    rcbinary, rctext, and sequence files via the JNI connector.
    
    todo-lists:
    - [x] Support read rc_binary files using local tvf
    - [x] Support read rc_text/sequence files using local tvf
    - [x] Support using s3/hdfs tvf
    
    Example:
    
    **sequence file:**
    input:
    ``` mysql
    select * from local( "file_path" = "test/test.seq", "format" = "sequence", 
"backend_id" = "10011", 
"hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:map<string,int>;k16:struct<name:string,age:int>");
    ```
    output:
    ```
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    | k1   | k2   | k3   | k4          | k5   | k6    | k7    | k8    | k9      
   | k10     | k11  | k12                 | k13        | k14             | k15  
                | k16                       |
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    |    7 |   13 |   74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char    
   | Varchar |    1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | 
{"key2":2, "key1":1} | {"name":"John", "age":30} |
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    1 row in set (0.07 sec)
    ```
    
    **rc_binary file:**
    input:
    ```mysql
    select * from local( "file_path" = "test/test.rcbinary", "format" = 
"rc_binary", "backend_id" = "10011", 
"hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:m
    ap<string,int>;k16:struct<name:string,age:int>");
    ```
    output:
    ```
    
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
    | k1   | k2   | k3   | k4          | k5   | k6   | k7     | k8   | k9       
  | k10       | k11  | k12                 | k13        | k14             | k15 
             | k16                           |
    
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
    |    1 |    2 |    3 | 10000000000 | 1.23 | 3.14 | 100.50 | you  | are      
  | beautiful |    0 | 2023-10-29 02:00:00 | 2023-10-29 | ["D", "E", "F"] | 
{"k2":5, "k1":3} | {"name":"chandler", "age":54} |
    
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
    1 row in set (0.12 sec)
    ```
    
    **rc_text file:**
    input:
    ``` mysql
    select * from local( "file_path" = "test/test.rctext", "format" = 
"rc_text", "backend_id" = "10011", "hive_schema"="k1:tiny
    
int;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:
    map<string,int>;k16:struct<name:string,age:int>");
    ```
    output:
    ```
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    | k1   | k2   | k3   | k4          | k5   | k6    | k7    | k8    | k9      
   | k10     | k11  | k12                 | k13        | k14             | k15  
                | k16                       |
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    |    7 |   13 |   74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char    
   | Varchar |    1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | 
{"key2":2, "key1":1} | {"name":"John", "age":30} |
    
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
    1 row in set (0.06 sec)
    ```
---
 be/src/vec/exec/format/hive/hive_jni_reader.cpp    | 102 +++++++
 be/src/vec/exec/format/hive/hive_jni_reader.h      |  84 ++++++
 be/src/vec/exec/jni_connector.cpp                  |  78 ++++++
 be/src/vec/exec/jni_connector.h                    |   2 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |  10 +
 build.sh                                           |   2 +
 fe/be-java-extensions/hive-scanner/pom.xml         | 102 +++++++
 .../org/apache/doris/hive/HiveColumnValue.java     | 311 +++++++++++++++++++++
 .../org/apache/doris/hive/HiveFileContext.java     |  52 ++++
 .../java/org/apache/doris/hive/HiveJNIScanner.java | 259 +++++++++++++++++
 .../java/org/apache/doris/hive/HiveProperties.java |  49 ++++
 .../main/java/org/apache/doris/hive/S3Utils.java   | 102 +++++++
 .../hive-scanner/src/main/resources/package.xml    |  41 +++
 fe/be-java-extensions/pom.xml                      |   1 +
 .../doris/common/util/FileFormatConstants.java     |  11 +-
 .../apache/doris/common/util/FileFormatUtils.java  | 234 ++++++++++++----
 .../ExternalFileTableValuedFunction.java           |  18 ++
 fe/pom.xml                                         |   5 +
 gensrc/thrift/PlanNodes.thrift                     |   5 +-
 19 files changed, 1413 insertions(+), 55 deletions(-)

diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.cpp 
b/be/src/vec/exec/format/hive/hive_jni_reader.cpp
new file mode 100644
index 00000000000..1b00cee1678
--- /dev/null
+++ b/be/src/vec/exec/format/hive/hive_jni_reader.cpp
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hive_jni_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "common/logging.h"
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+
+namespace doris::vectorized {
+
+HiveJNIReader::HiveJNIReader(RuntimeState* state, RuntimeProfile* profile,
+                             const TFileScanRangeParams& params,
+                             const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             const TFileRangeDesc& range)
+        : JniReader(file_slot_descs, state, profile), _params(params), 
_range(range) {}
+
+HiveJNIReader::~HiveJNIReader() = default;
+
+TFileType::type HiveJNIReader::get_file_type() {
+    TFileType::type type;
+    if (_range.__isset.file_type) {
+        type = _range.file_type;
+    } else {
+        type = _params.file_type;
+    }
+    return type;
+}
+
+Status HiveJNIReader::init_fetch_table_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    std::ostringstream required_fields;
+    std::ostringstream columns_types;
+    std::vector<std::string> column_names;
+    int index = 0;
+    for (auto& desc : _file_slot_descs) {
+        std::string field = desc->col_name();
+        column_names.emplace_back(field);
+        std::string type = JniConnector::get_jni_type_v2(desc->type());
+        if (index == 0) {
+            required_fields << field;
+            columns_types << type;
+        } else {
+            required_fields << "," << field;
+            columns_types << "#" << type;
+        }
+        index++;
+    }
+
+    TFileType::type type = get_file_type();
+    std::map<String, String> required_params = {
+            {"uri", _range.path},
+            {"file_type", std::to_string(type)},
+            {"file_format", std::to_string(_params.format_type)},
+            {"required_fields", required_fields.str()},
+            {"columns_types", columns_types.str()},
+            {"split_start_offset", std::to_string(_range.start_offset)},
+            {"split_size", std::to_string(_range.size)}};
+    if (type == TFileType::FILE_S3) {
+        required_params.insert(_params.properties.begin(), 
_params.properties.end());
+    }
+    _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/hive/HiveJNIScanner",
+                                                    required_params, 
column_names);
+    RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+
+Status HiveJNIReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status HiveJNIReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                  std::unordered_set<std::string>* 
missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.h 
b/be/src/vec/exec/format/hive/hive_jni_reader.h
new file mode 100644
index 00000000000..14051c320e0
--- /dev/null
+++ b/be/src/vec/exec/format/hive/hive_jni_reader.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <rapidjson/document.h>
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/jni_reader.h"
+
+namespace doris {
+
+class RuntimeProfile;
+
+class RuntimeState;
+
+class SlotDescriptor;
+
+namespace vectoried {
+
+class Block;
+
+} // namespace vectoried
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ *  Read hive-format file: rcbinary, rctext, sequencefile
+ */
+class HiveJNIReader : public JniReader {
+    ENABLE_FACTORY_CREATOR(HiveJNIReader);
+
+public:
+    /**
+     *  Call java side by jni to get table data
+     */
+    HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                  const std::vector<SlotDescriptor*>& file_slot_descs, const 
TFileRangeDesc& range);
+
+    ~HiveJNIReader() override;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_fetch_table_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
+
+    TFileType::type get_file_type();
+
+private:
+    const TFileScanRangeParams _params;
+    const TFileRangeDesc _range;
+    std::string _column_names;
+    std::string _column_types;
+    std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range = nullptr;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 0c2485ada3b..1bba35a85df 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -519,6 +519,84 @@ std::string JniConnector::get_jni_type(const DataTypePtr& 
data_type) {
     }
 }
 
+std::string JniConnector::get_jni_type_v2(const TypeDescriptor& desc) {
+    std::ostringstream buffer;
+    switch (desc.type) {
+    case TYPE_BOOLEAN:
+        return "boolean";
+    case TYPE_TINYINT:
+        return "tinyint";
+    case TYPE_SMALLINT:
+        return "smallint";
+    case TYPE_INT:
+        return "int";
+    case TYPE_BIGINT:
+        return "bigint";
+    case TYPE_LARGEINT:
+        return "largeint";
+    case TYPE_FLOAT:
+        return "float";
+    case TYPE_DOUBLE:
+        return "double";
+    case TYPE_VARCHAR: {
+        buffer << "varchar(" << desc.len << ")";
+        return buffer.str();
+    }
+    case TYPE_DATE:
+        [[fallthrough]];
+    case TYPE_DATEV2:
+        return "date";
+    case TYPE_DATETIME:
+        [[fallthrough]];
+    case TYPE_TIME:
+        [[fallthrough]];
+    case TYPE_DATETIMEV2:
+        [[fallthrough]];
+    case TYPE_TIMEV2:
+        return "timestamp";
+    case TYPE_BINARY:
+        return "binary";
+    case TYPE_CHAR: {
+        buffer << "char(" << desc.len << ")";
+        return buffer.str();
+    }
+    case TYPE_STRING:
+        return "string";
+    case TYPE_DECIMALV2:
+        [[fallthrough]];
+    case TYPE_DECIMAL32:
+        [[fallthrough]];
+    case TYPE_DECIMAL64:
+        [[fallthrough]];
+    case TYPE_DECIMAL128I: {
+        buffer << "decimal(" << desc.precision << "," << desc.scale << ")";
+        return buffer.str();
+    }
+    case TYPE_STRUCT: {
+        buffer << "struct<";
+        for (int i = 0; i < desc.children.size(); ++i) {
+            if (i != 0) {
+                buffer << ",";
+            }
+            buffer << desc.field_names[i] << ":" << 
get_jni_type(desc.children[i]);
+        }
+        buffer << ">";
+        return buffer.str();
+    }
+    case TYPE_ARRAY: {
+        buffer << "array<" << get_jni_type(desc.children[0]) << ">";
+        return buffer.str();
+    }
+    case TYPE_MAP: {
+        buffer << "map<" << get_jni_type(desc.children[0]) << "," << 
get_jni_type(desc.children[1])
+               << ">";
+        return buffer.str();
+    }
+    default:
+        return "unsupported";
+    }
+}
+
 std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
     std::ostringstream buffer;
     switch (desc.type) {
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 52a3fb2e778..7a1c5a1df49 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -261,6 +261,8 @@ public:
     /**
      * Map PrimitiveType to hive type.
      */
+    static std::string get_jni_type_v2(const TypeDescriptor& desc);
+
     static std::string get_jni_type(const TypeDescriptor& desc);
 
     static Status to_java_table(Block* block, size_t num_rows, const 
ColumnNumbers& arguments,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52aa752935e..58f520e693b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -56,6 +56,7 @@
 #include "vec/exec/format/arrow/arrow_stream_reader.h"
 #include "vec/exec/format/avro/avro_jni_reader.h"
 #include "vec/exec/format/csv/csv_reader.h"
+#include "vec/exec/format/hive/hive_jni_reader.h"
 #include "vec/exec/format/json/new_json_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
@@ -945,6 +946,15 @@ Status VFileScanner::_get_next_reader() {
                                   
->init_fetch_table_reader(_colname_to_value_range);
             break;
         }
+        case TFileFormatType::FORMAT_SEQUENCE:
+        case TFileFormatType::FORMAT_RCTEXT:
+        case TFileFormatType::FORMAT_RCBINARY: {
+            _cur_reader = HiveJNIReader::create_unique(_state, _profile, 
*_params, _file_slot_descs,
+                                                       range);
+            init_status = ((HiveJNIReader*)(_cur_reader.get()))
+                                  
->init_fetch_table_reader(_colname_to_value_range);
+            break;
+        }
         case TFileFormatType::FORMAT_WAL: {
             _cur_reader.reset(new WalReader(_state));
             init_status = 
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
diff --git a/build.sh b/build.sh
index c8aa2bf2c43..9ad36f7bbaa 100755
--- a/build.sh
+++ b/build.sh
@@ -534,6 +534,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     modules+=("be-java-extensions/max-compute-scanner")
     modules+=("be-java-extensions/avro-scanner")
     modules+=("be-java-extensions/lakesoul-scanner")
+    modules+=("be-java-extensions/hive-scanner")
     modules+=("be-java-extensions/preload-extensions")
 
     # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules 
that need to be ignored from FE_MODULES
@@ -819,6 +820,7 @@ EOF
     extensions_modules+=("max-compute-scanner")
     extensions_modules+=("avro-scanner")
     extensions_modules+=("lakesoul-scanner")
+    extensions_modules+=("hive-scanner")
     extensions_modules+=("preload-extensions")
 
     if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
diff --git a/fe/be-java-extensions/hive-scanner/pom.xml 
b/fe/be-java-extensions/hive-scanner/pom.xml
new file mode 100644
index 00000000000..64b4f7f0342
--- /dev/null
+++ b/fe/be-java-extensions/hive-scanner/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hive-scanner</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-serde</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>hive-scanner</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>src/main/resources/package.xml</descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git 
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
new file mode 100644
index 00000000000..03c716677f7
--- /dev/null
+++ 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map.Entry;
+
+public class HiveColumnValue implements ColumnValue {
+
+    private static final Logger LOG = 
LogManager.getLogger(HiveColumnValue.class);
+    private final Object fieldData;
+    private final ObjectInspector fieldInspector;
+
+    public HiveColumnValue(ObjectInspector fieldInspector, Object fieldData) {
+        this.fieldInspector = fieldInspector;
+        this.fieldData = fieldData;
+    }
+
+    private Object inspectObject() {
+        if (fieldData == null) {
+            return null;
+        }
+        if (fieldInspector instanceof PrimitiveObjectInspector) {
+            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) 
fieldInspector;
+            return poi.getPrimitiveJavaObject(fieldData);
+        }
+        return fieldData;
+    }
+
+    @Override
+    public boolean canGetStringAsBytes() {
+        return fieldInspector instanceof BinaryObjectInspector;
+    }
+
+    @Override
+    public boolean isNull() {
+        return fieldData == null || inspectObject() == null;
+    }
+
+    @Override
+    public boolean getBoolean() {
+        Object value = inspectObject();
+        return value != null && ((Boolean) value);
+    }
+
+    @Override
+    public byte getByte() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).byteValue();
+        }
+        return Byte.parseByte(value.toString());
+    }
+
+    @Override
+    public short getShort() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).shortValue();
+        }
+        return Short.parseShort(value.toString());
+    }
+
+    @Override
+    public int getInt() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+        return Integer.parseInt(value.toString());
+    }
+
+    @Override
+    public float getFloat() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0.0f;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).floatValue();
+        }
+        return Float.parseFloat(value.toString());
+    }
+
+    @Override
+    public long getLong() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0L;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).longValue();
+        }
+        return Long.parseLong(value.toString());
+    }
+
+    @Override
+    public double getDouble() {
+        Object value = inspectObject();
+        if (value == null) {
+            return 0.0d;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).doubleValue();
+        }
+        return Double.parseDouble(value.toString());
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        Object value = inspectObject();
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof BigInteger) {
+            return (BigInteger) value;
+        } else if (value instanceof Number) {
+            return BigInteger.valueOf(((Number) value).longValue());
+        }
+        return new BigInteger(value.toString());
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        Object value = inspectObject();
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof HiveDecimal) {
+            return ((HiveDecimal) value).bigDecimalValue();
+        } else if (value instanceof BigDecimal) {
+            return (BigDecimal) value;
+        }
+        return new BigDecimal(value.toString());
+    }
+
+    @Override
+    public String getString() {
+        Object value = inspectObject();
+        if (value == null) {
+            return null;
+        }
+        return value.toString();
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        if (fieldData == null) {
+            return null;
+        }
+        if (fieldInspector instanceof BinaryObjectInspector) {
+            BytesWritable bw = ((BinaryObjectInspector) 
fieldInspector).getPrimitiveWritableObject(fieldData);
+            return bw.copyBytes();
+        } else if (fieldInspector instanceof StringObjectInspector) {
+            String str = getString();
+            return str != null ? str.getBytes() : null;
+        }
+        return null;
+    }
+
+    @Override
+    public LocalDate getDate() {
+        if (fieldData == null) {
+            return null;
+        }
+        if (fieldInspector instanceof DateObjectInspector) {
+            DateObjectInspector doi = (DateObjectInspector) fieldInspector;
+            Date hiveDate = doi.getPrimitiveJavaObject(fieldData);
+            if (hiveDate != null) {
+                return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(), 
hiveDate.getDay());
+            }
+        } else if (fieldInspector instanceof PrimitiveObjectInspector) {
+            Object value = inspectObject();
+            if (value instanceof Date) {
+                Date hiveDate = (Date) value;
+                return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(), 
hiveDate.getDay());
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        if (fieldData == null) {
+            return null;
+        }
+        if (fieldInspector instanceof TimestampObjectInspector) {
+            TimestampObjectInspector toi = (TimestampObjectInspector) 
fieldInspector;
+            Timestamp hiveTimestamp = toi.getPrimitiveJavaObject(fieldData);
+            if (hiveTimestamp != null) {
+                // Convert Hive Timestamp to LocalDateTime
+                return LocalDateTime.of(
+                        hiveTimestamp.getYear(),
+                        hiveTimestamp.getMonth(),
+                        hiveTimestamp.getDay(),
+                        hiveTimestamp.getHours(),
+                        hiveTimestamp.getMinutes(),
+                        hiveTimestamp.getSeconds(),
+                        hiveTimestamp.getNanos()
+                );
+            }
+        } else if (fieldInspector instanceof DateObjectInspector) {
+            LocalDate date = getDate();
+            if (date != null) {
+                return date.atStartOfDay();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return getStringAsBytes();
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+        if (fieldData == null) {
+            return;
+        }
+        ListObjectInspector listInspector = (ListObjectInspector) 
fieldInspector;
+        List<?> items = listInspector.getList(fieldData);
+        ObjectInspector itemInspector = 
listInspector.getListElementObjectInspector();
+        for (Object item : items) {
+            ColumnValue cv = item != null ? new HiveColumnValue(itemInspector, 
item) : null;
+            values.add(cv);
+        }
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+        if (fieldData == null) {
+            return;
+        }
+        MapObjectInspector mapInspector = (MapObjectInspector) fieldInspector;
+        ObjectInspector keyInspector = mapInspector.getMapKeyObjectInspector();
+        ObjectInspector valueInspector = 
mapInspector.getMapValueObjectInspector();
+        for (Entry<?, ?> entry : mapInspector.getMap(fieldData).entrySet()) {
+            ColumnValue key = entry.getKey() != null ? new 
HiveColumnValue(keyInspector, entry.getKey()) : null;
+            ColumnValue value = entry.getValue() != null ? new 
HiveColumnValue(valueInspector, entry.getValue()) : null;
+            keys.add(key);
+            values.add(value);
+        }
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+        if (fieldData == null) {
+            return;
+        }
+        StructObjectInspector structInspector = (StructObjectInspector) 
fieldInspector;
+        List<? extends StructField> fields = 
structInspector.getAllStructFieldRefs();
+        for (Integer idx : structFieldIndex) {
+            if (idx != null && idx >= 0 && idx < fields.size()) {
+                StructField sf = fields.get(idx);
+                Object fieldObj = 
structInspector.getStructFieldData(fieldData, sf);
+                ColumnValue cv = fieldObj != null ? new 
HiveColumnValue(sf.getFieldObjectInspector(), fieldObj) : null;
+                values.add(cv);
+            } else {
+                values.add(null);
+            }
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
new file mode 100644
index 00000000000..0b13935a5fc
--- /dev/null
+++ 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.thrift.TFileFormatType;
+
+public class HiveFileContext {
+    private final String serde;
+    private final String inputFormat;
+
+    HiveFileContext(TFileFormatType fileFormatType) {
+        switch (fileFormatType) {
+            case FORMAT_RCBINARY:
+                serde = HiveProperties.RC_BINARY_SERDE_CLASS;
+                inputFormat = HiveProperties.RC_BINARY_INPUT_FORMAT;
+                break;
+            case FORMAT_RCTEXT:
+                serde = HiveProperties.RC_TEXT_SERDE_CLASS;
+                inputFormat = HiveProperties.RC_TEXT_INPUT_FORMAT;
+                break;
+            case FORMAT_SEQUENCE:
+                serde = HiveProperties.SEQUENCE_SERDE_CLASS;
+                inputFormat = HiveProperties.SEQUENCE_INPUT_FORMAT;
+                break;
+            default:
+                throw new UnsupportedOperationException("Unrecognized file 
format " + fileFormatType);
+        }
+    }
+
+    String getSerde() {
+        return serde;
+    }
+
+    String getInputFormat() {
+        return inputFormat;
+    }
+}
diff --git 
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
new file mode 100644
index 00000000000..ae98bb28a10
--- /dev/null
+++ 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.avro.S3Utils;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.TableSchema;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class HiveJNIScanner extends JniScanner {
+
+    private static final Logger LOG = 
LogManager.getLogger(HiveJNIScanner.class);
+    private final ClassLoader classLoader;
+    private final TFileType fileType;
+    private final int fetchSize;
+    private final Map<String, String> requiredParams;
+    private final String[] columnTypes;
+    private final String[] requiredFields;
+    private final ColumnType[] requiredTypes;
+    private final int[] requiredColumnIds;
+    private final TFileFormatType fileFormat;
+    private final StructField[] structFields;
+    private final ObjectInspector[] fieldInspectors;
+    private final Long splitStartOffset;
+    private final Long splitSize;
+    private String uri;
+    private StructObjectInspector rowInspector;
+    private Deserializer deserializer;
+    private RecordReader<Writable, Writable> reader;
+    private Writable key;
+    private Writable value;
+    private HiveFileContext hiveFileContext;
+
+    public HiveJNIScanner(int fetchSize, Map<String, String> requiredParams) {
+        this.classLoader = this.getClass().getClassLoader();
+        this.fetchSize = fetchSize;
+        this.requiredParams = requiredParams;
+        this.fileType = 
TFileType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_TYPE)));
+        this.fileFormat = 
TFileFormatType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_FORMAT)));
+        this.columnTypes = requiredParams.get(HiveProperties.COLUMNS_TYPES)
+                .split(HiveProperties.COLUMNS_TYPE_DELIMITER);
+        this.requiredFields = 
requiredParams.get(HiveProperties.REQUIRED_FIELDS).split(HiveProperties.FIELDS_DELIMITER);
+        this.requiredTypes = new ColumnType[requiredFields.length];
+        this.requiredColumnIds = new int[requiredFields.length];
+        this.uri = requiredParams.get(HiveProperties.URI);
+        this.splitStartOffset = 
Long.parseLong(requiredParams.get(HiveProperties.SPLIT_START_OFFSET));
+        this.splitSize = 
Long.parseLong(requiredParams.get(HiveProperties.SPLIT_SIZE));
+        this.structFields = new StructField[requiredFields.length];
+        this.fieldInspectors = new ObjectInspector[requiredFields.length];
+    }
+
+    private void processS3Conf(String accessKey, String secretKey, String 
endpoint,
+            String region, JobConf jobConf) {
+        if (!StringUtils.isEmpty(accessKey) && 
!StringUtils.isEmpty(secretKey)) {
+            jobConf.set(HiveProperties.FS_S3A_ACCESS_KEY, accessKey);
+            jobConf.set(HiveProperties.FS_S3A_SECRET_KEY, secretKey);
+        }
+        jobConf.set(HiveProperties.FS_S3A_ENDPOINT, endpoint);
+        jobConf.set(HiveProperties.FS_S3A_REGION, region);
+    }
+
+    private String processS3Uri(String uri) throws IOException {
+        S3Utils.parseURI(uri);
+        uri = "s3a://" + S3Utils.getBucket() + "/" + S3Utils.getKey();
+        return uri;
+    }
+
+    private void initReader() throws Exception {
+        this.hiveFileContext = new HiveFileContext(fileFormat);
+        Properties properties = createProperties();
+        JobConf jobConf = makeJobConf(properties);
+        switch (fileType) {
+            case FILE_LOCAL:
+            case FILE_HDFS:
+                break;
+            case FILE_S3:
+                String accessKey = 
requiredParams.get(HiveProperties.S3_ACCESS_KEY);
+                String secretKey = 
requiredParams.get(HiveProperties.S3_SECRET_KEY);
+                String endpoint = 
requiredParams.get(HiveProperties.S3_ENDPOINT);
+                String region = requiredParams.get(HiveProperties.S3_REGION);
+                processS3Conf(accessKey, secretKey, endpoint, region, jobConf);
+                uri = processS3Uri(uri);
+                break;
+            default:
+                throw new Exception("Unsupported " + fileType.getValue() + " 
file type.");
+        }
+        Path path = new Path(uri);
+        FileSplit fileSplit = new FileSplit(path, splitStartOffset, splitSize, 
(String[]) null);
+        InputFormat<?, ?> inputFormatClass = createInputFormat(jobConf, 
hiveFileContext.getInputFormat());
+        reader = (RecordReader<Writable, Writable>) 
inputFormatClass.getRecordReader(fileSplit, jobConf, Reporter.NULL);
+        deserializer = getDeserializer(jobConf, properties, 
hiveFileContext.getSerde());
+        rowInspector = getTableObjectInspector(deserializer);
+        for (int i = 0; i < requiredFields.length; i++) {
+            StructField field = 
rowInspector.getStructFieldRef(requiredFields[i]);
+            structFields[i] = field;
+            fieldInspectors[i] = field.getFieldObjectInspector();
+        }
+    }
+
+    private InputFormat<?, ?> createInputFormat(Configuration conf, String 
inputFormat) throws Exception {
+        Class<?> clazz = conf.getClassByName(inputFormat);
+        Class<? extends InputFormat<?, ?>> cls =
+                (Class<? extends InputFormat<?, ?>>) 
clazz.asSubclass(InputFormat.class);
+        return ReflectionUtils.newInstance(cls, conf);
+    }
+
+    private StructObjectInspector getTableObjectInspector(Deserializer 
deserializer) throws Exception {
+        ObjectInspector inspector = deserializer.getObjectInspector();
+        return (StructObjectInspector) inspector;
+    }
+
+    private Properties createProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
+                
Arrays.stream(this.requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+        
properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
String.join(",", requiredFields));
+        properties.setProperty(HiveProperties.COLUMNS, String.join(",", 
requiredFields));
+        properties.setProperty(HiveProperties.COLUMNS2TYPES, String.join(",", 
columnTypes));
+        properties.setProperty(serdeConstants.SERIALIZATION_LIB, 
hiveFileContext.getSerde());
+        return properties;
+    }
+
+    private JobConf makeJobConf(Properties properties) {
+        Configuration conf = new Configuration();
+        JobConf jobConf = new JobConf(conf);
+        jobConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+        properties.stringPropertyNames().forEach(name -> jobConf.set(name, 
properties.getProperty(name)));
+        return jobConf;
+    }
+
+    private Deserializer getDeserializer(Configuration configuration, 
Properties properties, String name)
+            throws Exception {
+        Class<? extends Deserializer> deserializerClass = Class.forName(name, 
true, JavaUtils.getClassLoader())
+                .asSubclass(Deserializer.class);
+        Deserializer deserializer = 
deserializerClass.getConstructor().newInstance();
+        deserializer.initialize(configuration, properties);
+        return deserializer;
+    }
+
+    @Override
+    public void open() throws IOException {
+        try (ThreadContextClassLoader ignored = new 
ThreadContextClassLoader(classLoader)) {
+            parseRequiredTypes();
+            initTableInfo(requiredTypes, requiredFields, fetchSize);
+            initReader();
+        } catch (Exception e) {
+            close();
+            LOG.error("Failed to open the hive reader.", e);
+            throw new IOException("Failed to open the hive reader.", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try (ThreadContextClassLoader ignored = new 
ThreadContextClassLoader(classLoader)) {
+            if (reader != null) {
+                reader.close();
+            }
+        } catch (IOException e) {
+            LOG.error("Failed to close the hive reader.", e);
+            throw new IOException("Failed to close the hive reader.", e);
+        }
+    }
+
+    @Override
+    public int getNext() throws IOException {
+        try (ThreadContextClassLoader ignored = new 
ThreadContextClassLoader(classLoader)) {
+            key = reader.createKey();
+            value = reader.createValue();
+            int numRows = 0;
+            for (; numRows < getBatchSize(); numRows++) {
+                if (!reader.next(key, value)) {
+                    break;
+                }
+                Object rowData = deserializer.deserialize(value);
+                for (int i = 0; i < requiredFields.length; i++) {
+                    Object fieldData = 
rowInspector.getStructFieldData(rowData, structFields[i]);
+                    if (fieldData == null) {
+                        appendData(i, null);
+                    } else {
+                        HiveColumnValue fieldValue = new 
HiveColumnValue(fieldInspectors[i], fieldData);
+                        appendData(i, fieldValue);
+                    }
+                }
+            }
+            return numRows;
+        } catch (Exception e) {
+            close();
+            LOG.error("Failed to get next row of data.", e);
+            throw new IOException("Failed to get next row of data.", e);
+        }
+    }
+
+    @Override
+    protected TableSchema parseTableSchema() throws 
UnsupportedOperationException {
+        return null;
+    }
+
+    private void parseRequiredTypes() {
+        HashMap<String, Integer> hiveColumnNameToIndex = new HashMap<>();
+        HashMap<String, String> hiveColumnNameToType = new HashMap<>();
+        for (int i = 0; i < requiredFields.length; i++) {
+            hiveColumnNameToIndex.put(requiredFields[i], i);
+            hiveColumnNameToType.put(requiredFields[i], columnTypes[i]);
+        }
+
+        for (int i = 0; i < requiredFields.length; i++) {
+            String fieldName = requiredFields[i];
+            requiredColumnIds[i] = hiveColumnNameToIndex.get(fieldName);
+            String typeStr = hiveColumnNameToType.get(fieldName);
+            requiredTypes[i] = ColumnType.parseType(fieldName, typeStr);
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
new file mode 100644
index 00000000000..3949c84dd31
--- /dev/null
+++ 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+public class HiveProperties {
+
+    protected static final String COLUMNS_TYPE_DELIMITER = "#";
+    protected static final String FIELDS_DELIMITER = ",";
+
+    protected static final String COLUMNS_TYPES = "columns_types";
+    protected static final String REQUIRED_FIELDS = "required_fields";
+    protected static final String FILE_TYPE = "file_type";
+    protected static final String FILE_FORMAT = "file_format";
+    protected static final String URI = "uri";
+    protected static final String S3_ACCESS_KEY = "s3.access_key";
+    protected static final String S3_SECRET_KEY = "s3.secret_key";
+    protected static final String S3_ENDPOINT = "s3.endpoint";
+    protected static final String S3_REGION = "s3.region";
+    protected static final String COLUMNS = "columns";
+    protected static final String COLUMNS2TYPES = "columns.types";
+    protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+    protected static final String FS_S3A_REGION = "fs.s3a.region";
+    protected static final String SPLIT_START_OFFSET = "split_start_offset";
+    protected static final String SPLIT_SIZE = "split_size";
+    protected static final String RC_BINARY_SERDE_CLASS
+            = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
+    protected static final String RC_BINARY_INPUT_FORMAT = 
"org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+    protected static final String RC_TEXT_SERDE_CLASS = 
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
+    protected static final String RC_TEXT_INPUT_FORMAT = 
"org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+    protected static final String SEQUENCE_SERDE_CLASS = 
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+    protected static final String SEQUENCE_INPUT_FORMAT = 
"org.apache.hadoop.mapred.SequenceFileInputFormat";
+}
diff --git 
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
new file mode 100644
index 00000000000..45845af3c03
--- /dev/null
+++ 
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+public class S3Utils {
+    private static final String SCHEMA_S3 = "s3";
+    private static final String SCHEMA_HTTP = "http";
+    private static final String SCHEMA_HTTPS = "https";
+    private static final String SCHEME_DELIM = "://";
+    private static final String PATH_DELIM = "/";
+    private static final String QUERY_DELIM = "\\?";
+    private static final String FRAGMENT_DELIM = "#";
+    private static String bucket;
+    private static String key;
+
+    /**
+     * eg:
+     * s3:     s3://bucket1/path/to/file.txt
+     * http:   http://10.10.10.1:9000/bucket1/to/file.txt
+     * https:  https://10.10.10.1:9000/bucket1/to/file.txt
+     * <p>
+     * schema: s3,http,https
+     * bucket: bucket1
+     * key:    path/to/file.txt
+     */
+    public static void parseURI(String uri) throws IOException {
+        if (StringUtils.isEmpty(uri)) {
+            throw new IOException("s3 uri is empty.");
+        }
+        String[] schemeSplit = uri.split(SCHEME_DELIM);
+        String rest;
+        if (schemeSplit.length == 2) {
+            if (schemeSplit[0].equalsIgnoreCase(SCHEMA_S3)) {
+                // has scheme, eg: s3://bucket1/path/to/file.txt
+                rest = schemeSplit[1];
+                String[] authoritySplit = rest.split(PATH_DELIM, 2);
+                if (authoritySplit.length < 1) {
+                    throw new IOException("Invalid S3 URI. uri=" + uri);
+                }
+                bucket = authoritySplit[0];
+                // support s3://bucket1
+                key = authoritySplit.length == 1 ? "/" : authoritySplit[1];
+            } else if (schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTP) || 
schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTPS)) {
+                // has scheme, eg: http(s)://host/bucket1/path/to/file.txt
+                rest = schemeSplit[1];
+                String[] authoritySplit = rest.split(PATH_DELIM, 3);
+                if (authoritySplit.length != 3) {
+                    throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+                }
+                // authority_split[1] is host
+                bucket = authoritySplit[1];
+                key = authoritySplit[2];
+            } else {
+                throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+            }
+
+        } else if (schemeSplit.length == 1) {
+            // no scheme, eg: path/to/file.txt
+            bucket = ""; // unknown
+            key = uri;
+        } else {
+            throw new IOException("Invalid S3 URI. uri=" + uri);
+        }
+
+        key = key.trim();
+        if (StringUtils.isEmpty(key)) {
+            throw new IOException("Invalid S3 URI. uri=" + uri);
+        }
+        // Strip query and fragment if they exist
+        String[] querySplit = key.split(QUERY_DELIM);
+        String[] fragmentSplit = querySplit[0].split(FRAGMENT_DELIM);
+        key = fragmentSplit[0];
+    }
+
+    public static String getBucket() {
+        return bucket;
+    }
+
+    public static String getKey() {
+        return key;
+    }
+
+}
diff --git a/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..d9ebe58202b
--- /dev/null
+++ b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index bbe056739d5..8123c5c63d9 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -31,6 +31,7 @@ under the License.
         <module>lakesoul-scanner</module>
         <module>preload-extensions</module>
         <module>trino-connector-scanner</module>
+        <module>hive-scanner</module>
     </modules>
 
     <parent>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index bdb2e97b9f2..bdc12226600 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -34,6 +34,9 @@ public class FileFormatConstants {
     public static final String FORMAT_AVRO = "avro";
     public static final String FORMAT_WAL = "wal";
     public static final String FORMAT_ARROW = "arrow";
+    public static final String FORMAT_RC_BINARY = "rc_binary";
+    public static final String FORMAT_RC_TEXT = "rc_text";
+    public static final String FORMAT_SEQUENCE = "sequence";
 
     public static final String PROP_FORMAT = "format";
     public static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -47,6 +50,7 @@ public class FileFormatConstants {
     public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
     public static final String PROP_SKIP_LINES = "skip_lines";
     public static final String PROP_CSV_SCHEMA = "csv_schema";
+    public static final String PROP_HIVE_SCHEMA = "hive_schema";
     public static final String PROP_COMPRESS = "compress";
     public static final String PROP_COMPRESS_TYPE = "compress_type";
     public static final String PROP_PATH_PARTITION_KEYS = 
"path_partition_keys";
@@ -55,5 +59,10 @@ public class FileFormatConstants {
     public static final Pattern DECIMAL_TYPE_PATTERN = 
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
     // datetime(p)
     public static final Pattern DATETIME_TYPE_PATTERN = 
Pattern.compile("datetime\\((\\d+)\\)");
-
+    // timestamp(p)
+    public static final Pattern TIMESTAMP_TYPE_PATTERN = 
Pattern.compile("timestamp\\((\\d+)\\)");
+    // char(len)
+    public static final Pattern CHAR_TYPE_PATTERN = 
Pattern.compile("char\\((\\d+)\\)");
+    // varchar(len)
+    public static final Pattern VARCHAR_TYPE_PATTERN = 
Pattern.compile("varchar\\((\\d+)\\)");
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
index 0b646a00b16..c79647a9c36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
@@ -17,14 +17,19 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.MapType;
 import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeNameFormat;
 
 import com.google.common.base.Strings;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Matcher;
 
@@ -37,72 +42,195 @@ public class FileFormatUtils {
                 || 
FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr);
     }
 
+    public static boolean isHiveFormat(String formatStr) {
+        return FileFormatConstants.FORMAT_RC_BINARY.equalsIgnoreCase(formatStr)
+                || 
FileFormatConstants.FORMAT_RC_TEXT.equalsIgnoreCase(formatStr)
+                || 
FileFormatConstants.FORMAT_SEQUENCE.equalsIgnoreCase(formatStr);
+    }
+
     // public for unit test
     public static void parseCsvSchema(List<Column> csvSchema, String 
csvSchemaStr)
             throws AnalysisException {
         if (Strings.isNullOrEmpty(csvSchemaStr)) {
             return;
         }
-        // the schema str is like: 
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
+        // the schema str is like: 
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6);" +
+        // "k5:array<string>;k6:map<string,int>,k7:struct<name:string,age:int>"
         String[] schemaStrs = csvSchemaStr.split(";");
         try {
             for (String schemaStr : schemaStrs) {
-                String[] kv = schemaStr.replace(" ", "").split(":");
-                if (kv.length != 2) {
-                    throw new AnalysisException("invalid csv schema: " + 
csvSchemaStr);
+                schemaStr = schemaStr.replace(" ", "");
+                int colonIndex = schemaStr.indexOf(":");
+                if (colonIndex == -1) {
+                    throw new AnalysisException("invalid schema: " + 
csvSchemaStr);
                 }
-                Column column = null;
-                String name = kv[0].toLowerCase();
+                String name = schemaStr.substring(0, colonIndex).toLowerCase();
+                String type = schemaStr.substring(colonIndex + 
1).toLowerCase();
                 FeNameFormat.checkColumnName(name);
-                String type = kv[1].toLowerCase();
-                if (type.equals("tinyint")) {
-                    column = new Column(name, PrimitiveType.TINYINT, true);
-                } else if (type.equals("smallint")) {
-                    column = new Column(name, PrimitiveType.SMALLINT, true);
-                } else if (type.equals("int")) {
-                    column = new Column(name, PrimitiveType.INT, true);
-                } else if (type.equals("bigint")) {
-                    column = new Column(name, PrimitiveType.BIGINT, true);
-                } else if (type.equals("largeint")) {
-                    column = new Column(name, PrimitiveType.LARGEINT, true);
-                } else if (type.equals("float")) {
-                    column = new Column(name, PrimitiveType.FLOAT, true);
-                } else if (type.equals("double")) {
-                    column = new Column(name, PrimitiveType.DOUBLE, true);
-                } else if (type.startsWith("decimal")) {
-                    // regex decimal(p, s)
-                    Matcher matcher = 
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type);
-                    if (!matcher.find()) {
-                        throw new AnalysisException("invalid decimal type: " + 
type);
-                    }
-                    int precision = Integer.parseInt(matcher.group(1));
-                    int scale = Integer.parseInt(matcher.group(2));
-                    column = new Column(name, 
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
-                            "");
-                } else if (type.equals("date")) {
-                    column = new Column(name, ScalarType.createDateType(), 
false, null, true, null, "");
-                } else if (type.startsWith("datetime")) {
-                    int scale = 0;
-                    if (!type.equals("datetime")) {
-                        // regex datetime(s)
-                        Matcher matcher = 
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type);
-                        if (!matcher.find()) {
-                            throw new AnalysisException("invalid datetime 
type: " + type);
-                        }
-                        scale = Integer.parseInt(matcher.group(1));
-                    }
-                    column = new Column(name, 
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
-                } else if (type.equals("string")) {
-                    column = new Column(name, PrimitiveType.STRING, true);
-                } else if (type.equals("boolean")) {
-                    column = new Column(name, PrimitiveType.BOOLEAN, true);
-                } else {
-                    throw new AnalysisException("unsupported column type: " + 
type);
-                }
+
+                Type columnType = parseType(type);
+                Column column = new Column(name, columnType, false, null, 
true, null, "");
+
                 csvSchema.add(column);
             }
         } catch (Exception e) {
-            throw new AnalysisException("invalid csv schema: " + 
e.getMessage());
+            throw new AnalysisException("invalid schema: " + e.getMessage());
+        }
+    }
+
+    private static Type parseType(String typeStr) throws AnalysisException {
+        typeStr = typeStr.trim().toLowerCase();
+        if (typeStr.equals("tinyint")) {
+            return ScalarType.TINYINT;
+        } else if (typeStr.equals("smallint")) {
+            return ScalarType.SMALLINT;
+        } else if (typeStr.equals("int")) {
+            return ScalarType.INT;
+        } else if (typeStr.equals("bigint")) {
+            return ScalarType.BIGINT;
+        } else if (typeStr.equals("largeint")) {
+            return ScalarType.LARGEINT;
+        } else if (typeStr.equals("float")) {
+            return ScalarType.FLOAT;
+        } else if (typeStr.equals("double")) {
+            return ScalarType.DOUBLE;
+        } else if (typeStr.startsWith("decimal")) {
+            // Parse decimal(p, s)
+            Matcher matcher = 
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(typeStr);
+            if (!matcher.find()) {
+                throw new AnalysisException("Invalid decimal type: " + 
typeStr);
+            }
+            int precision = Integer.parseInt(matcher.group(1));
+            int scale = Integer.parseInt(matcher.group(2));
+            return ScalarType.createDecimalV3Type(precision, scale);
+        } else if (typeStr.equals("date")) {
+            return ScalarType.createDateType();
+        } else if (typeStr.startsWith("timestamp")) {
+            int scale = 0;
+            if (!typeStr.equals("timestamp")) {
+                // Parse timestamp(s)
+                Matcher matcher = 
FileFormatConstants.TIMESTAMP_TYPE_PATTERN.matcher(typeStr);
+                if (!matcher.find()) {
+                    throw new AnalysisException("Invalid timestamp type: " + 
typeStr);
+                }
+                scale = Integer.parseInt(matcher.group(1));
+            }
+            return ScalarType.createDatetimeV2Type(scale);
+        } else if (typeStr.startsWith("datetime")) {
+            int scale = 0;
+            if (!typeStr.equals("datetime")) {
+                // Parse datetime(s)
+                Matcher matcher = 
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(typeStr);
+                if (!matcher.find()) {
+                    throw new AnalysisException("Invalid datetime type: " + 
typeStr);
+                }
+                scale = Integer.parseInt(matcher.group(1));
+            }
+            return ScalarType.createDatetimeV2Type(scale);
+        } else if (typeStr.equals("string")) {
+            return ScalarType.createStringType();
+        } else if (typeStr.equals("boolean")) {
+            return ScalarType.BOOLEAN;
+        } else if (typeStr.startsWith("char")) {
+            // Parse char(len)
+            Matcher matcher = 
FileFormatConstants.CHAR_TYPE_PATTERN.matcher(typeStr);
+            if (matcher.matches()) {
+                int len = Integer.parseInt(matcher.group(1));
+                return ScalarType.createChar(len);
+            }
+            throw new AnalysisException("Invalid char type: " + typeStr);
+        } else if (typeStr.startsWith("varchar")) {
+            // Parse varchar(len)
+            Matcher matcher = 
FileFormatConstants.VARCHAR_TYPE_PATTERN.matcher(typeStr);
+            if (matcher.matches()) {
+                int len = Integer.parseInt(matcher.group(1));
+                return ScalarType.createVarcharType(len);
+            }
+            throw new AnalysisException("Invalid varchar type: " + typeStr);
+        } else if (typeStr.startsWith("array")) {
+            // Parse array<element_type>
+            if (typeStr.indexOf('<') == 5 && typeStr.endsWith(">")) {
+                String elementTypeStr = typeStr.substring(6, typeStr.length() 
- 1);
+                Type elementType = parseType(elementTypeStr);
+                return new ArrayType(elementType);
+            }
+            throw new AnalysisException("Invalid array type: " + typeStr);
+        } else if (typeStr.startsWith("map")) {
+            // Parse map<key_type,value_type>
+            if (typeStr.indexOf('<') == 3 && typeStr.endsWith(">")) {
+                String keyValueStr = typeStr.substring(4, typeStr.length() - 
1);
+                int commaIndex = findCommaOutsideBrackets(keyValueStr);
+                if (commaIndex == -1) {
+                    throw new AnalysisException("Invalid map type: " + 
typeStr);
+                }
+                String keyTypeStr = keyValueStr.substring(0, 
commaIndex).trim();
+                String valueTypeStr = keyValueStr.substring(commaIndex + 
1).trim();
+                Type keyType = parseType(keyTypeStr);
+                Type valueType = parseType(valueTypeStr);
+                return new MapType(keyType, valueType);
+            }
+            throw new AnalysisException("Invalid map type: " + typeStr);
+        } else if (typeStr.startsWith("struct")) {
+            // Parse struct<field1:type1,field2:type2,...>
+            if (typeStr.indexOf('<') == 6 && typeStr.endsWith(">")) {
+                String fieldStr = typeStr.substring(7, typeStr.length() - 1);
+                List<String> fieldDefs = splitStructFields(fieldStr);
+                ArrayList<StructField> structFields = new ArrayList<>();
+                for (String fieldDef : fieldDefs) {
+                    int colonIndex = fieldDef.indexOf(":");
+                    if (colonIndex == -1) {
+                        throw new AnalysisException("Invalid struct field: " + 
fieldDef);
+                    }
+                    String fieldName = fieldDef.substring(0, 
colonIndex).trim();
+                    String fieldTypeStr = fieldDef.substring(colonIndex + 
1).trim();
+                    Type fieldType = parseType(fieldTypeStr);
+                    StructField structField = new StructField(fieldName, 
fieldType);
+                    structFields.add(structField);
+                }
+                return new StructType(structFields);
+            }
+            throw new AnalysisException("Invalid struct type: " + typeStr);
+        } else {
+            throw new AnalysisException("Unsupported type: " + typeStr);
+        }
+    }
+
+    private static int findCommaOutsideBrackets(String s) {
+        int level = 0;
+        for (int i = 0; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c == '<') {
+                level++;
+            } else if (c == '>') {
+                level--;
+            } else if (c == ',' && level == 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private static List<String> splitStructFields(String s) throws 
AnalysisException {
+        List<String> fields = new ArrayList<>();
+        int level = 0;
+        int start = 0;
+        for (int i = 0; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c == '<') {
+                level++;
+            } else if (c == '>') {
+                level--;
+            } else if (c == ',' && level == 0) {
+                fields.add(s.substring(start, i).trim());
+                start = i + 1;
+            }
+        }
+        if (start < s.length()) {
+            fields.add(s.substring(start).trim());
+        }
+        if (level != 0) {
+            throw new AnalysisException("Unmatched angle brackets in struct 
definition.");
         }
+        return fields;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e137d5d200c..52d0477e581 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -211,6 +211,15 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
             case "wal":
                 this.fileFormatType = TFileFormatType.FORMAT_WAL;
                 break;
+            case "sequence":
+                this.fileFormatType = TFileFormatType.FORMAT_SEQUENCE;
+                break;
+            case "rc_binary":
+                this.fileFormatType = TFileFormatType.FORMAT_RCBINARY;
+                break;
+            case "rc_text":
+                this.fileFormatType = TFileFormatType.FORMAT_RCTEXT;
+                break;
             default:
                 throw new AnalysisException("format:" + formatString + " is 
not supported.");
         }
@@ -259,6 +268,15 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
             }
         }
 
+        // When parsing rc_binary/rc_text/sequence files, reuse parseCsvSchema 
to parse column names and types
+        if (FileFormatUtils.isHiveFormat(formatString)) {
+            FileFormatUtils.parseCsvSchema(csvSchema, 
getOrDefaultAndRemove(copiedProps,
+                    FileFormatConstants.PROP_HIVE_SCHEMA, ""));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("get hive schema: {}", csvSchema);
+            }
+        }
+
         pathPartitionKeys = Optional.ofNullable(
                 getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
                 .map(str -> Arrays.stream(str.split(","))
diff --git a/fe/pom.xml b/fe/pom.xml
index 29b2b61530a..02150a03301 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -1468,6 +1468,11 @@ under the License.
                 <artifactId>mariadb-java-client</artifactId>
                 <version>${mariadb-java-client.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-serde</artifactId>
+                <version>${hive.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.hive</groupId>
                 <artifactId>hive-common</artifactId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c77ab48b2d4..6760bf54875 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -120,7 +120,10 @@ enum TFileFormatType {
     FORMAT_CSV_LZ4BLOCK,
     FORMAT_CSV_SNAPPYBLOCK,
     FORMAT_WAL,
-    FORMAT_ARROW
+    FORMAT_ARROW,
+    FORMAT_RCBINARY,
+    FORMAT_RCTEXT,
+    FORMAT_SEQUENCE,
 }
 
 // In previous versions, the data compression format and file format were 
stored together, as TFileFormatType,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to