This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-seq_rc_file
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-seq_rc_file by this
push:
new 69802005dcd [Feature](tvf) Support using tvf to read
sequence_file/rc_file in local/hdfs/s3 (#41080)
69802005dcd is described below
commit 69802005dcded40dbbab77d071c69055149c9c0d
Author: 星が降らない街 <[email protected]>
AuthorDate: Mon Sep 23 16:13:32 2024 +0800
[Feature](tvf) Support using tvf to read sequence_file/rc_file in
local/hdfs/s3 (#41080)
## Proposed changes
Issue Number: #30669
<!--Describe your changes.-->
This change supports reading the contents of external file tables from
rcbinary, rctext, and sequence files via the JNI connector.
todo-lists:
- [x] Support read rc_binary files using local tvf
- [x] Support read rc_text/sequence files using local tvf
- [x] Support using s3/hdfs tvf
Example:
**sequence file:**
input:
``` mysql
select * from local( "file_path" = "test/test.seq", "format" = "sequence",
"backend_id" = "10011",
"hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:map<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9
| k10 | k11 | k12 | k13 | k14 | k15
| k16 |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char
| Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] |
{"key2":2, "key1":1} | {"name":"John", "age":30} |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
1 row in set (0.07 sec)
```
**rc_binary file:**
input:
```mysql
select * from local( "file_path" = "test/test.rcbinary", "format" =
"rc_binary", "backend_id" = "10011",
"hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:m
ap<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
| k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9
| k10 | k11 | k12 | k13 | k14 | k15
| k16 |
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
| 1 | 2 | 3 | 10000000000 | 1.23 | 3.14 | 100.50 | you | are
| beautiful | 0 | 2023-10-29 02:00:00 | 2023-10-29 | ["D", "E", "F"] |
{"k2":5, "k1":3} | {"name":"chandler", "age":54} |
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
1 row in set (0.12 sec)
```
**rc_text file:**
input:
``` mysql
select * from local( "file_path" = "test/test.rctext", "format" =
"rc_text", "backend_id" = "10011", "hive_schema"="k1:tiny
int;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:
map<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9
| k10 | k11 | k12 | k13 | k14 | k15
| k16 |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char
| Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] |
{"key2":2, "key1":1} | {"name":"John", "age":30} |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
1 row in set (0.06 sec)
```
---
be/src/vec/exec/format/hive/hive_jni_reader.cpp | 102 +++++++
be/src/vec/exec/format/hive/hive_jni_reader.h | 84 ++++++
be/src/vec/exec/jni_connector.cpp | 78 ++++++
be/src/vec/exec/jni_connector.h | 2 +
be/src/vec/exec/scan/vfile_scanner.cpp | 10 +
build.sh | 2 +
fe/be-java-extensions/hive-scanner/pom.xml | 102 +++++++
.../org/apache/doris/hive/HiveColumnValue.java | 311 +++++++++++++++++++++
.../org/apache/doris/hive/HiveFileContext.java | 52 ++++
.../java/org/apache/doris/hive/HiveJNIScanner.java | 259 +++++++++++++++++
.../java/org/apache/doris/hive/HiveProperties.java | 49 ++++
.../main/java/org/apache/doris/hive/S3Utils.java | 102 +++++++
.../hive-scanner/src/main/resources/package.xml | 41 +++
fe/be-java-extensions/pom.xml | 1 +
.../doris/common/util/FileFormatConstants.java | 11 +-
.../apache/doris/common/util/FileFormatUtils.java | 234 ++++++++++++----
.../ExternalFileTableValuedFunction.java | 18 ++
fe/pom.xml | 5 +
gensrc/thrift/PlanNodes.thrift | 5 +-
19 files changed, 1413 insertions(+), 55 deletions(-)
diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.cpp
b/be/src/vec/exec/format/hive/hive_jni_reader.cpp
new file mode 100644
index 00000000000..1b00cee1678
--- /dev/null
+++ b/be/src/vec/exec/format/hive/hive_jni_reader.cpp
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hive_jni_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "common/logging.h"
+#include "runtime/descriptors.h"
+#include "runtime/types.h"
+
+namespace doris::vectorized {
+
+HiveJNIReader::HiveJNIReader(RuntimeState* state, RuntimeProfile* profile,
+ const TFileScanRangeParams& params,
+ const std::vector<SlotDescriptor*>&
file_slot_descs,
+ const TFileRangeDesc& range)
+ : JniReader(file_slot_descs, state, profile), _params(params),
_range(range) {}
+
+HiveJNIReader::~HiveJNIReader() = default;
+
+TFileType::type HiveJNIReader::get_file_type() {
+ TFileType::type type;
+ if (_range.__isset.file_type) {
+ type = _range.file_type;
+ } else {
+ type = _params.file_type;
+ }
+ return type;
+}
+
+Status HiveJNIReader::init_fetch_table_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ _colname_to_value_range = colname_to_value_range;
+ std::ostringstream required_fields;
+ std::ostringstream columns_types;
+ std::vector<std::string> column_names;
+ int index = 0;
+ for (auto& desc : _file_slot_descs) {
+ std::string field = desc->col_name();
+ column_names.emplace_back(field);
+ std::string type = JniConnector::get_jni_type_v2(desc->type());
+ if (index == 0) {
+ required_fields << field;
+ columns_types << type;
+ } else {
+ required_fields << "," << field;
+ columns_types << "#" << type;
+ }
+ index++;
+ }
+
+ TFileType::type type = get_file_type();
+ std::map<String, String> required_params = {
+ {"uri", _range.path},
+ {"file_type", std::to_string(type)},
+ {"file_format", std::to_string(_params.format_type)},
+ {"required_fields", required_fields.str()},
+ {"columns_types", columns_types.str()},
+ {"split_start_offset", std::to_string(_range.start_offset)},
+ {"split_size", std::to_string(_range.size)}};
+ if (type == TFileType::FILE_S3) {
+ required_params.insert(_params.properties.begin(),
_params.properties.end());
+ }
+ _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/hive/HiveJNIScanner",
+ required_params,
column_names);
+ RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+
+Status HiveJNIReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
+ if (*eof) {
+ RETURN_IF_ERROR(_jni_connector->close());
+ }
+ return Status::OK();
+}
+
+Status HiveJNIReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ for (auto& desc : _file_slot_descs) {
+ name_to_type->emplace(desc->col_name(), desc->type());
+ }
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.h
b/be/src/vec/exec/format/hive/hive_jni_reader.h
new file mode 100644
index 00000000000..14051c320e0
--- /dev/null
+++ b/be/src/vec/exec/format/hive/hive_jni_reader.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <rapidjson/document.h>
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/jni_reader.h"
+
+namespace doris {
+
+class RuntimeProfile;
+
+class RuntimeState;
+
+class SlotDescriptor;
+
+namespace vectoried {
+
+class Block;
+
+} // namespace vectoried
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+
+/**
+ * Read hive-format file: rcbinary, rctext, sequencefile
+ */
+class HiveJNIReader : public JniReader {
+ ENABLE_FACTORY_CREATOR(HiveJNIReader);
+
+public:
+ /**
+ * Call java side by jni to get table data
+ */
+ HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const std::vector<SlotDescriptor*>& file_slot_descs, const
TFileRangeDesc& range);
+
+ ~HiveJNIReader() override;
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_fetch_table_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+
+ TFileType::type get_file_type();
+
+private:
+ const TFileScanRangeParams _params;
+ const TFileRangeDesc _range;
+ std::string _column_names;
+ std::string _column_types;
+ std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index 0c2485ada3b..1bba35a85df 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -519,6 +519,84 @@ std::string JniConnector::get_jni_type(const DataTypePtr&
data_type) {
}
}
+std::string JniConnector::get_jni_type_v2(const TypeDescriptor& desc) {
+ std::ostringstream buffer;
+ switch (desc.type) {
+ case TYPE_BOOLEAN:
+ return "boolean";
+ case TYPE_TINYINT:
+ return "tinyint";
+ case TYPE_SMALLINT:
+ return "smallint";
+ case TYPE_INT:
+ return "int";
+ case TYPE_BIGINT:
+ return "bigint";
+ case TYPE_LARGEINT:
+ return "largeint";
+ case TYPE_FLOAT:
+ return "float";
+ case TYPE_DOUBLE:
+ return "double";
+ case TYPE_VARCHAR: {
+ buffer << "varchar(" << desc.len << ")";
+ return buffer.str();
+ }
+ case TYPE_DATE:
+ [[fallthrough]];
+ case TYPE_DATEV2:
+ return "date";
+ case TYPE_DATETIME:
+ [[fallthrough]];
+ case TYPE_TIME:
+ [[fallthrough]];
+ case TYPE_DATETIMEV2:
+ [[fallthrough]];
+ case TYPE_TIMEV2:
+ return "timestamp";
+ case TYPE_BINARY:
+ return "binary";
+ case TYPE_CHAR: {
+ buffer << "char(" << desc.len << ")";
+ return buffer.str();
+ }
+ case TYPE_STRING:
+ return "string";
+ case TYPE_DECIMALV2:
+ [[fallthrough]];
+ case TYPE_DECIMAL32:
+ [[fallthrough]];
+ case TYPE_DECIMAL64:
+ [[fallthrough]];
+ case TYPE_DECIMAL128I: {
+ buffer << "decimal(" << desc.precision << "," << desc.scale << ")";
+ return buffer.str();
+ }
+ case TYPE_STRUCT: {
+ buffer << "struct<";
+ for (int i = 0; i < desc.children.size(); ++i) {
+ if (i != 0) {
+ buffer << ",";
+ }
+ buffer << desc.field_names[i] << ":" <<
get_jni_type(desc.children[i]);
+ }
+ buffer << ">";
+ return buffer.str();
+ }
+ case TYPE_ARRAY: {
+ buffer << "array<" << get_jni_type(desc.children[0]) << ">";
+ return buffer.str();
+ }
+ case TYPE_MAP: {
+ buffer << "map<" << get_jni_type(desc.children[0]) << "," <<
get_jni_type(desc.children[1])
+ << ">";
+ return buffer.str();
+ }
+ default:
+ return "unsupported";
+ }
+}
+
std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
std::ostringstream buffer;
switch (desc.type) {
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index 52a3fb2e778..7a1c5a1df49 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -261,6 +261,8 @@ public:
/**
* Map PrimitiveType to hive type.
*/
+ static std::string get_jni_type_v2(const TypeDescriptor& desc);
+
static std::string get_jni_type(const TypeDescriptor& desc);
static Status to_java_table(Block* block, size_t num_rows, const
ColumnNumbers& arguments,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 52aa752935e..58f520e693b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -56,6 +56,7 @@
#include "vec/exec/format/arrow/arrow_stream_reader.h"
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
+#include "vec/exec/format/hive/hive_jni_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
@@ -945,6 +946,15 @@ Status VFileScanner::_get_next_reader() {
->init_fetch_table_reader(_colname_to_value_range);
break;
}
+ case TFileFormatType::FORMAT_SEQUENCE:
+ case TFileFormatType::FORMAT_RCTEXT:
+ case TFileFormatType::FORMAT_RCBINARY: {
+ _cur_reader = HiveJNIReader::create_unique(_state, _profile,
*_params, _file_slot_descs,
+ range);
+ init_status = ((HiveJNIReader*)(_cur_reader.get()))
+
->init_fetch_table_reader(_colname_to_value_range);
+ break;
+ }
case TFileFormatType::FORMAT_WAL: {
_cur_reader.reset(new WalReader(_state));
init_status =
((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
diff --git a/build.sh b/build.sh
index c8aa2bf2c43..9ad36f7bbaa 100755
--- a/build.sh
+++ b/build.sh
@@ -534,6 +534,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("be-java-extensions/max-compute-scanner")
modules+=("be-java-extensions/avro-scanner")
modules+=("be-java-extensions/lakesoul-scanner")
+ modules+=("be-java-extensions/hive-scanner")
modules+=("be-java-extensions/preload-extensions")
# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules
that need to be ignored from FE_MODULES
@@ -819,6 +820,7 @@ EOF
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
extensions_modules+=("lakesoul-scanner")
+ extensions_modules+=("hive-scanner")
extensions_modules+=("preload-extensions")
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
diff --git a/fe/be-java-extensions/hive-scanner/pom.xml
b/fe/be-java-extensions/hive-scanner/pom.xml
new file mode 100644
index 00000000000..64b4f7f0342
--- /dev/null
+++ b/fe/be-java-extensions/hive-scanner/pom.xml
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hive-scanner</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>hive-catalog-shade</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>hive-scanner</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>src/main/resources/package.xml</descriptors>
+ <archive>
+ <manifest>
+ <mainClass></mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
new file mode 100644
index 00000000000..03c716677f7
--- /dev/null
+++
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.hadoop.hive.common.type.Date;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.Timestamp;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map.Entry;
+
+public class HiveColumnValue implements ColumnValue {
+
+ private static final Logger LOG =
LogManager.getLogger(HiveColumnValue.class);
+ private final Object fieldData;
+ private final ObjectInspector fieldInspector;
+
+ public HiveColumnValue(ObjectInspector fieldInspector, Object fieldData) {
+ this.fieldInspector = fieldInspector;
+ this.fieldData = fieldData;
+ }
+
+ private Object inspectObject() {
+ if (fieldData == null) {
+ return null;
+ }
+ if (fieldInspector instanceof PrimitiveObjectInspector) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector)
fieldInspector;
+ return poi.getPrimitiveJavaObject(fieldData);
+ }
+ return fieldData;
+ }
+
+ @Override
+ public boolean canGetStringAsBytes() {
+ return fieldInspector instanceof BinaryObjectInspector;
+ }
+
+ @Override
+ public boolean isNull() {
+ return fieldData == null || inspectObject() == null;
+ }
+
+ @Override
+ public boolean getBoolean() {
+ Object value = inspectObject();
+ return value != null && ((Boolean) value);
+ }
+
+ @Override
+ public byte getByte() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).byteValue();
+ }
+ return Byte.parseByte(value.toString());
+ }
+
+ @Override
+ public short getShort() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).shortValue();
+ }
+ return Short.parseShort(value.toString());
+ }
+
+ @Override
+ public int getInt() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ return Integer.parseInt(value.toString());
+ }
+
+ @Override
+ public float getFloat() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0.0f;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).floatValue();
+ }
+ return Float.parseFloat(value.toString());
+ }
+
+ @Override
+ public long getLong() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0L;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).longValue();
+ }
+ return Long.parseLong(value.toString());
+ }
+
+ @Override
+ public double getDouble() {
+ Object value = inspectObject();
+ if (value == null) {
+ return 0.0d;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+ return Double.parseDouble(value.toString());
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ Object value = inspectObject();
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof BigInteger) {
+ return (BigInteger) value;
+ } else if (value instanceof Number) {
+ return BigInteger.valueOf(((Number) value).longValue());
+ }
+ return new BigInteger(value.toString());
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ Object value = inspectObject();
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof HiveDecimal) {
+ return ((HiveDecimal) value).bigDecimalValue();
+ } else if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ }
+ return new BigDecimal(value.toString());
+ }
+
+ @Override
+ public String getString() {
+ Object value = inspectObject();
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ }
+
+ @Override
+ public byte[] getStringAsBytes() {
+ if (fieldData == null) {
+ return null;
+ }
+ if (fieldInspector instanceof BinaryObjectInspector) {
+ BytesWritable bw = ((BinaryObjectInspector)
fieldInspector).getPrimitiveWritableObject(fieldData);
+ return bw.copyBytes();
+ } else if (fieldInspector instanceof StringObjectInspector) {
+ String str = getString();
+ return str != null ? str.getBytes() : null;
+ }
+ return null;
+ }
+
+ @Override
+ public LocalDate getDate() {
+ if (fieldData == null) {
+ return null;
+ }
+ if (fieldInspector instanceof DateObjectInspector) {
+ DateObjectInspector doi = (DateObjectInspector) fieldInspector;
+ Date hiveDate = doi.getPrimitiveJavaObject(fieldData);
+ if (hiveDate != null) {
+ return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(),
hiveDate.getDay());
+ }
+ } else if (fieldInspector instanceof PrimitiveObjectInspector) {
+ Object value = inspectObject();
+ if (value instanceof Date) {
+ Date hiveDate = (Date) value;
+ return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(),
hiveDate.getDay());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ if (fieldData == null) {
+ return null;
+ }
+ if (fieldInspector instanceof TimestampObjectInspector) {
+ TimestampObjectInspector toi = (TimestampObjectInspector)
fieldInspector;
+ Timestamp hiveTimestamp = toi.getPrimitiveJavaObject(fieldData);
+ if (hiveTimestamp != null) {
+ // Convert Hive Timestamp to LocalDateTime
+ return LocalDateTime.of(
+ hiveTimestamp.getYear(),
+ hiveTimestamp.getMonth(),
+ hiveTimestamp.getDay(),
+ hiveTimestamp.getHours(),
+ hiveTimestamp.getMinutes(),
+ hiveTimestamp.getSeconds(),
+ hiveTimestamp.getNanos()
+ );
+ }
+ } else if (fieldInspector instanceof DateObjectInspector) {
+ LocalDate date = getDate();
+ if (date != null) {
+ return date.atStartOfDay();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return getStringAsBytes();
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+ if (fieldData == null) {
+ return;
+ }
+ ListObjectInspector listInspector = (ListObjectInspector)
fieldInspector;
+ List<?> items = listInspector.getList(fieldData);
+ ObjectInspector itemInspector =
listInspector.getListElementObjectInspector();
+ for (Object item : items) {
+ ColumnValue cv = item != null ? new HiveColumnValue(itemInspector,
item) : null;
+ values.add(cv);
+ }
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+ if (fieldData == null) {
+ return;
+ }
+ MapObjectInspector mapInspector = (MapObjectInspector) fieldInspector;
+ ObjectInspector keyInspector = mapInspector.getMapKeyObjectInspector();
+ ObjectInspector valueInspector =
mapInspector.getMapValueObjectInspector();
+ for (Entry<?, ?> entry : mapInspector.getMap(fieldData).entrySet()) {
+ ColumnValue key = entry.getKey() != null ? new
HiveColumnValue(keyInspector, entry.getKey()) : null;
+ ColumnValue value = entry.getValue() != null ? new
HiveColumnValue(valueInspector, entry.getValue()) : null;
+ keys.add(key);
+ values.add(value);
+ }
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
+ if (fieldData == null) {
+ return;
+ }
+ StructObjectInspector structInspector = (StructObjectInspector)
fieldInspector;
+ List<? extends StructField> fields =
structInspector.getAllStructFieldRefs();
+ for (Integer idx : structFieldIndex) {
+ if (idx != null && idx >= 0 && idx < fields.size()) {
+ StructField sf = fields.get(idx);
+ Object fieldObj =
structInspector.getStructFieldData(fieldData, sf);
+ ColumnValue cv = fieldObj != null ? new
HiveColumnValue(sf.getFieldObjectInspector(), fieldObj) : null;
+ values.add(cv);
+ } else {
+ values.add(null);
+ }
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
new file mode 100644
index 00000000000..0b13935a5fc
--- /dev/null
+++
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.thrift.TFileFormatType;
+
+public class HiveFileContext {
+ private final String serde;
+ private final String inputFormat;
+
+ HiveFileContext(TFileFormatType fileFormatType) {
+ switch (fileFormatType) {
+ case FORMAT_RCBINARY:
+ serde = HiveProperties.RC_BINARY_SERDE_CLASS;
+ inputFormat = HiveProperties.RC_BINARY_INPUT_FORMAT;
+ break;
+ case FORMAT_RCTEXT:
+ serde = HiveProperties.RC_TEXT_SERDE_CLASS;
+ inputFormat = HiveProperties.RC_TEXT_INPUT_FORMAT;
+ break;
+ case FORMAT_SEQUENCE:
+ serde = HiveProperties.SEQUENCE_SERDE_CLASS;
+ inputFormat = HiveProperties.SEQUENCE_INPUT_FORMAT;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unrecognized file
format " + fileFormatType);
+ }
+ }
+
+ String getSerde() {
+ return serde;
+ }
+
+ String getInputFormat() {
+ return inputFormat;
+ }
+}
diff --git
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
new file mode 100644
index 00000000000..ae98bb28a10
--- /dev/null
+++
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+import org.apache.doris.avro.S3Utils;
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.TableSchema;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class HiveJNIScanner extends JniScanner {
+
+ private static final Logger LOG =
LogManager.getLogger(HiveJNIScanner.class);
+ private final ClassLoader classLoader;
+ private final TFileType fileType;
+ private final int fetchSize;
+ private final Map<String, String> requiredParams;
+ private final String[] columnTypes;
+ private final String[] requiredFields;
+ private final ColumnType[] requiredTypes;
+ private final int[] requiredColumnIds;
+ private final TFileFormatType fileFormat;
+ private final StructField[] structFields;
+ private final ObjectInspector[] fieldInspectors;
+ private final Long splitStartOffset;
+ private final Long splitSize;
+ private String uri;
+ private StructObjectInspector rowInspector;
+ private Deserializer deserializer;
+ private RecordReader<Writable, Writable> reader;
+ private Writable key;
+ private Writable value;
+ private HiveFileContext hiveFileContext;
+
+ public HiveJNIScanner(int fetchSize, Map<String, String> requiredParams) {
+ this.classLoader = this.getClass().getClassLoader();
+ this.fetchSize = fetchSize;
+ this.requiredParams = requiredParams;
+ this.fileType =
TFileType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_TYPE)));
+ this.fileFormat =
TFileFormatType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_FORMAT)));
+ this.columnTypes = requiredParams.get(HiveProperties.COLUMNS_TYPES)
+ .split(HiveProperties.COLUMNS_TYPE_DELIMITER);
+ this.requiredFields =
requiredParams.get(HiveProperties.REQUIRED_FIELDS).split(HiveProperties.FIELDS_DELIMITER);
+ this.requiredTypes = new ColumnType[requiredFields.length];
+ this.requiredColumnIds = new int[requiredFields.length];
+ this.uri = requiredParams.get(HiveProperties.URI);
+ this.splitStartOffset =
Long.parseLong(requiredParams.get(HiveProperties.SPLIT_START_OFFSET));
+ this.splitSize =
Long.parseLong(requiredParams.get(HiveProperties.SPLIT_SIZE));
+ this.structFields = new StructField[requiredFields.length];
+ this.fieldInspectors = new ObjectInspector[requiredFields.length];
+ }
+
+ private void processS3Conf(String accessKey, String secretKey, String
endpoint,
+ String region, JobConf jobConf) {
+ if (!StringUtils.isEmpty(accessKey) &&
!StringUtils.isEmpty(secretKey)) {
+ jobConf.set(HiveProperties.FS_S3A_ACCESS_KEY, accessKey);
+ jobConf.set(HiveProperties.FS_S3A_SECRET_KEY, secretKey);
+ }
+ jobConf.set(HiveProperties.FS_S3A_ENDPOINT, endpoint);
+ jobConf.set(HiveProperties.FS_S3A_REGION, region);
+ }
+
+ private String processS3Uri(String uri) throws IOException {
+ S3Utils.parseURI(uri);
+ uri = "s3a://" + S3Utils.getBucket() + "/" + S3Utils.getKey();
+ return uri;
+ }
+
+ private void initReader() throws Exception {
+ this.hiveFileContext = new HiveFileContext(fileFormat);
+ Properties properties = createProperties();
+ JobConf jobConf = makeJobConf(properties);
+ switch (fileType) {
+ case FILE_LOCAL:
+ case FILE_HDFS:
+ break;
+ case FILE_S3:
+ String accessKey =
requiredParams.get(HiveProperties.S3_ACCESS_KEY);
+ String secretKey =
requiredParams.get(HiveProperties.S3_SECRET_KEY);
+ String endpoint =
requiredParams.get(HiveProperties.S3_ENDPOINT);
+ String region = requiredParams.get(HiveProperties.S3_REGION);
+ processS3Conf(accessKey, secretKey, endpoint, region, jobConf);
+ uri = processS3Uri(uri);
+ break;
+ default:
+ throw new Exception("Unsupported " + fileType.getValue() + "
file type.");
+ }
+ Path path = new Path(uri);
+ FileSplit fileSplit = new FileSplit(path, splitStartOffset, splitSize,
(String[]) null);
+ InputFormat<?, ?> inputFormatClass = createInputFormat(jobConf,
hiveFileContext.getInputFormat());
+ reader = (RecordReader<Writable, Writable>)
inputFormatClass.getRecordReader(fileSplit, jobConf, Reporter.NULL);
+ deserializer = getDeserializer(jobConf, properties,
hiveFileContext.getSerde());
+ rowInspector = getTableObjectInspector(deserializer);
+ for (int i = 0; i < requiredFields.length; i++) {
+ StructField field =
rowInspector.getStructFieldRef(requiredFields[i]);
+ structFields[i] = field;
+ fieldInspectors[i] = field.getFieldObjectInspector();
+ }
+ }
+
+ private InputFormat<?, ?> createInputFormat(Configuration conf, String
inputFormat) throws Exception {
+ Class<?> clazz = conf.getClassByName(inputFormat);
+ Class<? extends InputFormat<?, ?>> cls =
+ (Class<? extends InputFormat<?, ?>>)
clazz.asSubclass(InputFormat.class);
+ return ReflectionUtils.newInstance(cls, conf);
+ }
+
+ private StructObjectInspector getTableObjectInspector(Deserializer
deserializer) throws Exception {
+ ObjectInspector inspector = deserializer.getObjectInspector();
+ return (StructObjectInspector) inspector;
+ }
+
+ private Properties createProperties() {
+ Properties properties = new Properties();
+ properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
+
Arrays.stream(this.requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+
properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
String.join(",", requiredFields));
+ properties.setProperty(HiveProperties.COLUMNS, String.join(",",
requiredFields));
+ properties.setProperty(HiveProperties.COLUMNS2TYPES, String.join(",",
columnTypes));
+ properties.setProperty(serdeConstants.SERIALIZATION_LIB,
hiveFileContext.getSerde());
+ return properties;
+ }
+
+ private JobConf makeJobConf(Properties properties) {
+ Configuration conf = new Configuration();
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
+ properties.stringPropertyNames().forEach(name -> jobConf.set(name,
properties.getProperty(name)));
+ return jobConf;
+ }
+
+ private Deserializer getDeserializer(Configuration configuration,
Properties properties, String name)
+ throws Exception {
+ Class<? extends Deserializer> deserializerClass = Class.forName(name,
true, JavaUtils.getClassLoader())
+ .asSubclass(Deserializer.class);
+ Deserializer deserializer =
deserializerClass.getConstructor().newInstance();
+ deserializer.initialize(configuration, properties);
+ return deserializer;
+ }
+
+ @Override
+ public void open() throws IOException {
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
+ parseRequiredTypes();
+ initTableInfo(requiredTypes, requiredFields, fetchSize);
+ initReader();
+ } catch (Exception e) {
+ close();
+ LOG.error("Failed to open the hive reader.", e);
+ throw new IOException("Failed to open the hive reader.", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
+ if (reader != null) {
+ reader.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to close the hive reader.", e);
+ throw new IOException("Failed to close the hive reader.", e);
+ }
+ }
+
+ @Override
+ public int getNext() throws IOException {
+ try (ThreadContextClassLoader ignored = new
ThreadContextClassLoader(classLoader)) {
+ key = reader.createKey();
+ value = reader.createValue();
+ int numRows = 0;
+ for (; numRows < getBatchSize(); numRows++) {
+ if (!reader.next(key, value)) {
+ break;
+ }
+ Object rowData = deserializer.deserialize(value);
+ for (int i = 0; i < requiredFields.length; i++) {
+ Object fieldData =
rowInspector.getStructFieldData(rowData, structFields[i]);
+ if (fieldData == null) {
+ appendData(i, null);
+ } else {
+ HiveColumnValue fieldValue = new
HiveColumnValue(fieldInspectors[i], fieldData);
+ appendData(i, fieldValue);
+ }
+ }
+ }
+ return numRows;
+ } catch (Exception e) {
+ close();
+ LOG.error("Failed to get next row of data.", e);
+ throw new IOException("Failed to get next row of data.", e);
+ }
+ }
+
+ @Override
+ protected TableSchema parseTableSchema() throws
UnsupportedOperationException {
+ return null;
+ }
+
+ private void parseRequiredTypes() {
+ HashMap<String, Integer> hiveColumnNameToIndex = new HashMap<>();
+ HashMap<String, String> hiveColumnNameToType = new HashMap<>();
+ for (int i = 0; i < requiredFields.length; i++) {
+ hiveColumnNameToIndex.put(requiredFields[i], i);
+ hiveColumnNameToType.put(requiredFields[i], columnTypes[i]);
+ }
+
+ for (int i = 0; i < requiredFields.length; i++) {
+ String fieldName = requiredFields[i];
+ requiredColumnIds[i] = hiveColumnNameToIndex.get(fieldName);
+ String typeStr = hiveColumnNameToType.get(fieldName);
+ requiredTypes[i] = ColumnType.parseType(fieldName, typeStr);
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
new file mode 100644
index 00000000000..3949c84dd31
--- /dev/null
+++
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hive;
+
+public class HiveProperties {
+
+ protected static final String COLUMNS_TYPE_DELIMITER = "#";
+ protected static final String FIELDS_DELIMITER = ",";
+
+ protected static final String COLUMNS_TYPES = "columns_types";
+ protected static final String REQUIRED_FIELDS = "required_fields";
+ protected static final String FILE_TYPE = "file_type";
+ protected static final String FILE_FORMAT = "file_format";
+ protected static final String URI = "uri";
+ protected static final String S3_ACCESS_KEY = "s3.access_key";
+ protected static final String S3_SECRET_KEY = "s3.secret_key";
+ protected static final String S3_ENDPOINT = "s3.endpoint";
+ protected static final String S3_REGION = "s3.region";
+ protected static final String COLUMNS = "columns";
+ protected static final String COLUMNS2TYPES = "columns.types";
+ protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+ protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+ protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+ protected static final String FS_S3A_REGION = "fs.s3a.region";
+ protected static final String SPLIT_START_OFFSET = "split_start_offset";
+ protected static final String SPLIT_SIZE = "split_size";
+ protected static final String RC_BINARY_SERDE_CLASS
+ = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe";
+ protected static final String RC_BINARY_INPUT_FORMAT =
"org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ protected static final String RC_TEXT_SERDE_CLASS =
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
+ protected static final String RC_TEXT_INPUT_FORMAT =
"org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+ protected static final String SEQUENCE_SERDE_CLASS =
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
+ protected static final String SEQUENCE_INPUT_FORMAT =
"org.apache.hadoop.mapred.SequenceFileInputFormat";
+}
diff --git
a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
new file mode 100644
index 00000000000..45845af3c03
--- /dev/null
+++
b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+public class S3Utils {
+ private static final String SCHEMA_S3 = "s3";
+ private static final String SCHEMA_HTTP = "http";
+ private static final String SCHEMA_HTTPS = "https";
+ private static final String SCHEME_DELIM = "://";
+ private static final String PATH_DELIM = "/";
+ private static final String QUERY_DELIM = "\\?";
+ private static final String FRAGMENT_DELIM = "#";
+ private static String bucket;
+ private static String key;
+
+ /**
+ * eg:
+ * s3: s3://bucket1/path/to/file.txt
+ * http: http://10.10.10.1:9000/bucket1/to/file.txt
+ * https: https://10.10.10.1:9000/bucket1/to/file.txt
+ * <p>
+ * schema: s3,http,https
+ * bucket: bucket1
+ * key: path/to/file.txt
+ */
+ public static void parseURI(String uri) throws IOException {
+ if (StringUtils.isEmpty(uri)) {
+ throw new IOException("s3 uri is empty.");
+ }
+ String[] schemeSplit = uri.split(SCHEME_DELIM);
+ String rest;
+ if (schemeSplit.length == 2) {
+ if (schemeSplit[0].equalsIgnoreCase(SCHEMA_S3)) {
+ // has scheme, eg: s3://bucket1/path/to/file.txt
+ rest = schemeSplit[1];
+ String[] authoritySplit = rest.split(PATH_DELIM, 2);
+ if (authoritySplit.length < 1) {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+ bucket = authoritySplit[0];
+ // support s3://bucket1
+ key = authoritySplit.length == 1 ? "/" : authoritySplit[1];
+ } else if (schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTP) ||
schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTPS)) {
+ // has scheme, eg: http(s)://host/bucket1/path/to/file.txt
+ rest = schemeSplit[1];
+ String[] authoritySplit = rest.split(PATH_DELIM, 3);
+ if (authoritySplit.length != 3) {
+ throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+ }
+ // authority_split[1] is host
+ bucket = authoritySplit[1];
+ key = authoritySplit[2];
+ } else {
+ throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+ }
+
+ } else if (schemeSplit.length == 1) {
+ // no scheme, eg: path/to/file.txt
+ bucket = ""; // unknown
+ key = uri;
+ } else {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+
+ key = key.trim();
+ if (StringUtils.isEmpty(key)) {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+ // Strip query and fragment if they exist
+ String[] querySplit = key.split(QUERY_DELIM);
+ String[] fragmentSplit = querySplit[0].split(FRAGMENT_DELIM);
+ key = fragmentSplit[0];
+ }
+
+ public static String getBucket() {
+ return bucket;
+ }
+
+ public static String getKey() {
+ return key;
+ }
+
+}
diff --git a/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..d9ebe58202b
--- /dev/null
+++ b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index bbe056739d5..8123c5c63d9 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -31,6 +31,7 @@ under the License.
<module>lakesoul-scanner</module>
<module>preload-extensions</module>
<module>trino-connector-scanner</module>
+ <module>hive-scanner</module>
</modules>
<parent>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
index bdb2e97b9f2..bdc12226600 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java
@@ -34,6 +34,9 @@ public class FileFormatConstants {
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String FORMAT_ARROW = "arrow";
+ public static final String FORMAT_RC_BINARY = "rc_binary";
+ public static final String FORMAT_RC_TEXT = "rc_text";
+ public static final String FORMAT_SEQUENCE = "sequence";
public static final String PROP_FORMAT = "format";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -47,6 +50,7 @@ public class FileFormatConstants {
public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
public static final String PROP_SKIP_LINES = "skip_lines";
public static final String PROP_CSV_SCHEMA = "csv_schema";
+ public static final String PROP_HIVE_SCHEMA = "hive_schema";
public static final String PROP_COMPRESS = "compress";
public static final String PROP_COMPRESS_TYPE = "compress_type";
public static final String PROP_PATH_PARTITION_KEYS =
"path_partition_keys";
@@ -55,5 +59,10 @@ public class FileFormatConstants {
public static final Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
// datetime(p)
public static final Pattern DATETIME_TYPE_PATTERN =
Pattern.compile("datetime\\((\\d+)\\)");
-
+ // timestamp(p)
+ public static final Pattern TIMESTAMP_TYPE_PATTERN =
Pattern.compile("timestamp\\((\\d+)\\)");
+ // char(len)
+ public static final Pattern CHAR_TYPE_PATTERN =
Pattern.compile("char\\((\\d+)\\)");
+ // varchar(len)
+ public static final Pattern VARCHAR_TYPE_PATTERN =
Pattern.compile("varchar\\((\\d+)\\)");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
index 0b646a00b16..c79647a9c36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java
@@ -17,14 +17,19 @@
package org.apache.doris.common.util;
+import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructField;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import com.google.common.base.Strings;
+import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
@@ -37,72 +42,195 @@ public class FileFormatUtils {
||
FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr);
}
+ public static boolean isHiveFormat(String formatStr) {
+ return FileFormatConstants.FORMAT_RC_BINARY.equalsIgnoreCase(formatStr)
+ ||
FileFormatConstants.FORMAT_RC_TEXT.equalsIgnoreCase(formatStr)
+ ||
FileFormatConstants.FORMAT_SEQUENCE.equalsIgnoreCase(formatStr);
+ }
+
// public for unit test
public static void parseCsvSchema(List<Column> csvSchema, String
csvSchemaStr)
throws AnalysisException {
if (Strings.isNullOrEmpty(csvSchemaStr)) {
return;
}
- // the schema str is like:
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
+ // the schema str is like:
"k1:int;k2:bigint;k3:varchar(20);k4:datetime(6);" +
+ // "k5:array<string>;k6:map<string,int>,k7:struct<name:string,age:int>"
String[] schemaStrs = csvSchemaStr.split(";");
try {
for (String schemaStr : schemaStrs) {
- String[] kv = schemaStr.replace(" ", "").split(":");
- if (kv.length != 2) {
- throw new AnalysisException("invalid csv schema: " +
csvSchemaStr);
+ schemaStr = schemaStr.replace(" ", "");
+ int colonIndex = schemaStr.indexOf(":");
+ if (colonIndex == -1) {
+ throw new AnalysisException("invalid schema: " +
csvSchemaStr);
}
- Column column = null;
- String name = kv[0].toLowerCase();
+ String name = schemaStr.substring(0, colonIndex).toLowerCase();
+ String type = schemaStr.substring(colonIndex +
1).toLowerCase();
FeNameFormat.checkColumnName(name);
- String type = kv[1].toLowerCase();
- if (type.equals("tinyint")) {
- column = new Column(name, PrimitiveType.TINYINT, true);
- } else if (type.equals("smallint")) {
- column = new Column(name, PrimitiveType.SMALLINT, true);
- } else if (type.equals("int")) {
- column = new Column(name, PrimitiveType.INT, true);
- } else if (type.equals("bigint")) {
- column = new Column(name, PrimitiveType.BIGINT, true);
- } else if (type.equals("largeint")) {
- column = new Column(name, PrimitiveType.LARGEINT, true);
- } else if (type.equals("float")) {
- column = new Column(name, PrimitiveType.FLOAT, true);
- } else if (type.equals("double")) {
- column = new Column(name, PrimitiveType.DOUBLE, true);
- } else if (type.startsWith("decimal")) {
- // regex decimal(p, s)
- Matcher matcher =
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type);
- if (!matcher.find()) {
- throw new AnalysisException("invalid decimal type: " +
type);
- }
- int precision = Integer.parseInt(matcher.group(1));
- int scale = Integer.parseInt(matcher.group(2));
- column = new Column(name,
ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
- "");
- } else if (type.equals("date")) {
- column = new Column(name, ScalarType.createDateType(),
false, null, true, null, "");
- } else if (type.startsWith("datetime")) {
- int scale = 0;
- if (!type.equals("datetime")) {
- // regex datetime(s)
- Matcher matcher =
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type);
- if (!matcher.find()) {
- throw new AnalysisException("invalid datetime
type: " + type);
- }
- scale = Integer.parseInt(matcher.group(1));
- }
- column = new Column(name,
ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
- } else if (type.equals("string")) {
- column = new Column(name, PrimitiveType.STRING, true);
- } else if (type.equals("boolean")) {
- column = new Column(name, PrimitiveType.BOOLEAN, true);
- } else {
- throw new AnalysisException("unsupported column type: " +
type);
- }
+
+ Type columnType = parseType(type);
+ Column column = new Column(name, columnType, false, null,
true, null, "");
+
csvSchema.add(column);
}
} catch (Exception e) {
- throw new AnalysisException("invalid csv schema: " +
e.getMessage());
+ throw new AnalysisException("invalid schema: " + e.getMessage());
+ }
+ }
+
+ private static Type parseType(String typeStr) throws AnalysisException {
+ typeStr = typeStr.trim().toLowerCase();
+ if (typeStr.equals("tinyint")) {
+ return ScalarType.TINYINT;
+ } else if (typeStr.equals("smallint")) {
+ return ScalarType.SMALLINT;
+ } else if (typeStr.equals("int")) {
+ return ScalarType.INT;
+ } else if (typeStr.equals("bigint")) {
+ return ScalarType.BIGINT;
+ } else if (typeStr.equals("largeint")) {
+ return ScalarType.LARGEINT;
+ } else if (typeStr.equals("float")) {
+ return ScalarType.FLOAT;
+ } else if (typeStr.equals("double")) {
+ return ScalarType.DOUBLE;
+ } else if (typeStr.startsWith("decimal")) {
+ // Parse decimal(p, s)
+ Matcher matcher =
FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(typeStr);
+ if (!matcher.find()) {
+ throw new AnalysisException("Invalid decimal type: " +
typeStr);
+ }
+ int precision = Integer.parseInt(matcher.group(1));
+ int scale = Integer.parseInt(matcher.group(2));
+ return ScalarType.createDecimalV3Type(precision, scale);
+ } else if (typeStr.equals("date")) {
+ return ScalarType.createDateType();
+ } else if (typeStr.startsWith("timestamp")) {
+ int scale = 0;
+ if (!typeStr.equals("timestamp")) {
+ // Parse timestamp(s)
+ Matcher matcher =
FileFormatConstants.TIMESTAMP_TYPE_PATTERN.matcher(typeStr);
+ if (!matcher.find()) {
+ throw new AnalysisException("Invalid timestamp type: " +
typeStr);
+ }
+ scale = Integer.parseInt(matcher.group(1));
+ }
+ return ScalarType.createDatetimeV2Type(scale);
+ } else if (typeStr.startsWith("datetime")) {
+ int scale = 0;
+ if (!typeStr.equals("datetime")) {
+ // Parse datetime(s)
+ Matcher matcher =
FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(typeStr);
+ if (!matcher.find()) {
+ throw new AnalysisException("Invalid datetime type: " +
typeStr);
+ }
+ scale = Integer.parseInt(matcher.group(1));
+ }
+ return ScalarType.createDatetimeV2Type(scale);
+ } else if (typeStr.equals("string")) {
+ return ScalarType.createStringType();
+ } else if (typeStr.equals("boolean")) {
+ return ScalarType.BOOLEAN;
+ } else if (typeStr.startsWith("char")) {
+ // Parse char(len)
+ Matcher matcher =
FileFormatConstants.CHAR_TYPE_PATTERN.matcher(typeStr);
+ if (matcher.matches()) {
+ int len = Integer.parseInt(matcher.group(1));
+ return ScalarType.createChar(len);
+ }
+ throw new AnalysisException("Invalid char type: " + typeStr);
+ } else if (typeStr.startsWith("varchar")) {
+ // Parse varchar(len)
+ Matcher matcher =
FileFormatConstants.VARCHAR_TYPE_PATTERN.matcher(typeStr);
+ if (matcher.matches()) {
+ int len = Integer.parseInt(matcher.group(1));
+ return ScalarType.createVarcharType(len);
+ }
+ throw new AnalysisException("Invalid varchar type: " + typeStr);
+ } else if (typeStr.startsWith("array")) {
+ // Parse array<element_type>
+ if (typeStr.indexOf('<') == 5 && typeStr.endsWith(">")) {
+ String elementTypeStr = typeStr.substring(6, typeStr.length()
- 1);
+ Type elementType = parseType(elementTypeStr);
+ return new ArrayType(elementType);
+ }
+ throw new AnalysisException("Invalid array type: " + typeStr);
+ } else if (typeStr.startsWith("map")) {
+ // Parse map<key_type,value_type>
+ if (typeStr.indexOf('<') == 3 && typeStr.endsWith(">")) {
+ String keyValueStr = typeStr.substring(4, typeStr.length() -
1);
+ int commaIndex = findCommaOutsideBrackets(keyValueStr);
+ if (commaIndex == -1) {
+ throw new AnalysisException("Invalid map type: " +
typeStr);
+ }
+ String keyTypeStr = keyValueStr.substring(0,
commaIndex).trim();
+ String valueTypeStr = keyValueStr.substring(commaIndex +
1).trim();
+ Type keyType = parseType(keyTypeStr);
+ Type valueType = parseType(valueTypeStr);
+ return new MapType(keyType, valueType);
+ }
+ throw new AnalysisException("Invalid map type: " + typeStr);
+ } else if (typeStr.startsWith("struct")) {
+ // Parse struct<field1:type1,field2:type2,...>
+ if (typeStr.indexOf('<') == 6 && typeStr.endsWith(">")) {
+ String fieldStr = typeStr.substring(7, typeStr.length() - 1);
+ List<String> fieldDefs = splitStructFields(fieldStr);
+ ArrayList<StructField> structFields = new ArrayList<>();
+ for (String fieldDef : fieldDefs) {
+ int colonIndex = fieldDef.indexOf(":");
+ if (colonIndex == -1) {
+ throw new AnalysisException("Invalid struct field: " +
fieldDef);
+ }
+ String fieldName = fieldDef.substring(0,
colonIndex).trim();
+ String fieldTypeStr = fieldDef.substring(colonIndex +
1).trim();
+ Type fieldType = parseType(fieldTypeStr);
+ StructField structField = new StructField(fieldName,
fieldType);
+ structFields.add(structField);
+ }
+ return new StructType(structFields);
+ }
+ throw new AnalysisException("Invalid struct type: " + typeStr);
+ } else {
+ throw new AnalysisException("Unsupported type: " + typeStr);
+ }
+ }
+
+ private static int findCommaOutsideBrackets(String s) {
+ int level = 0;
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '<') {
+ level++;
+ } else if (c == '>') {
+ level--;
+ } else if (c == ',' && level == 0) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static List<String> splitStructFields(String s) throws
AnalysisException {
+ List<String> fields = new ArrayList<>();
+ int level = 0;
+ int start = 0;
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '<') {
+ level++;
+ } else if (c == '>') {
+ level--;
+ } else if (c == ',' && level == 0) {
+ fields.add(s.substring(start, i).trim());
+ start = i + 1;
+ }
+ }
+ if (start < s.length()) {
+ fields.add(s.substring(start).trim());
+ }
+ if (level != 0) {
+ throw new AnalysisException("Unmatched angle brackets in struct
definition.");
}
+ return fields;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e137d5d200c..52d0477e581 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -211,6 +211,15 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
case "wal":
this.fileFormatType = TFileFormatType.FORMAT_WAL;
break;
+ case "sequence":
+ this.fileFormatType = TFileFormatType.FORMAT_SEQUENCE;
+ break;
+ case "rc_binary":
+ this.fileFormatType = TFileFormatType.FORMAT_RCBINARY;
+ break;
+ case "rc_text":
+ this.fileFormatType = TFileFormatType.FORMAT_RCTEXT;
+ break;
default:
throw new AnalysisException("format:" + formatString + " is
not supported.");
}
@@ -259,6 +268,15 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
}
}
+ // When parsing rc_binary/rc_text/sequence files, reuse parseCsvSchema
to parse column names and types
+ if (FileFormatUtils.isHiveFormat(formatString)) {
+ FileFormatUtils.parseCsvSchema(csvSchema,
getOrDefaultAndRemove(copiedProps,
+ FileFormatConstants.PROP_HIVE_SCHEMA, ""));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get hive schema: {}", csvSchema);
+ }
+ }
+
pathPartitionKeys = Optional.ofNullable(
getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
.map(str -> Arrays.stream(str.split(","))
diff --git a/fe/pom.xml b/fe/pom.xml
index 29b2b61530a..02150a03301 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -1468,6 +1468,11 @@ under the License.
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb-java-client.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c77ab48b2d4..6760bf54875 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -120,7 +120,10 @@ enum TFileFormatType {
FORMAT_CSV_LZ4BLOCK,
FORMAT_CSV_SNAPPYBLOCK,
FORMAT_WAL,
- FORMAT_ARROW
+ FORMAT_ARROW,
+ FORMAT_RCBINARY,
+ FORMAT_RCTEXT,
+ FORMAT_SEQUENCE,
}
// In previous versions, the data compression format and file format were
stored together, as TFileFormatType,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]