This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 43eb946543 [feature](table-valued-function)S3 table valued function
supports parquet/orc/json file format #14130
43eb946543 is described below
commit 43eb9465438edd090b0249b0b8ecff6cf4b5cf54
Author: Tiewei Fang <[email protected]>
AuthorDate: Thu Nov 10 10:33:12 2022 +0800
[feature](table-valued-function)S3 table valued function supports
parquet/orc/json file format #14130
S3 table valued function supports parquet/orc/json file format.
For example: parquet format
---
be/src/service/internal_service.cpp | 25 +++-
be/src/vec/exec/format/json/new_json_reader.cpp | 130 ++++++++++++++++++++-
be/src/vec/exec/format/json/new_json_reader.h | 7 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 42 +++++++
be/src/vec/exec/format/orc/vorc_reader.h | 6 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 35 ++++++
be/src/vec/exec/format/parquet/vparquet_reader.h | 6 +
.../doris/planner/external/HiveScanProvider.java | 25 ++--
.../doris/planner/external/QueryScanProvider.java | 17 +--
.../doris/planner/external/TVFScanProvider.java | 15 +--
.../ExternalFileTableValuedFunction.java | 104 ++++++++++++-----
.../doris/tablefunction/S3TableValuedFunction.java | 35 ++----
12 files changed, 348 insertions(+), 99 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 26301b8f4e..d15726272f 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -51,6 +51,9 @@
#include "util/uid_util.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/json/new_json_reader.h"
+#include "vec/exec/format/orc/vorc_reader.h"
+#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris {
@@ -426,7 +429,6 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
return;
}
}
-
if (file_scan_range.__isset.ranges == false) {
st = Status::InternalError("can not get TFileRangeDesc.");
st.to_protobuf(result->mutable_status());
@@ -439,8 +441,7 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
}
const TFileRangeDesc& range = file_scan_range.ranges.at(0);
const TFileScanRangeParams& params = file_scan_range.params;
- // file_slots is no use
- std::vector<SlotDescriptor*> file_slots;
+
std::unique_ptr<vectorized::GenericReader> reader(nullptr);
std::unique_ptr<RuntimeProfile> profile(new
RuntimeProfile("FetchTableSchema"));
switch (params.format_type) {
@@ -450,16 +451,32 @@ void
PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE: {
+ // file_slots is no use
+ std::vector<SlotDescriptor*> file_slots;
reader.reset(new vectorized::CsvReader(profile.get(), params, range,
file_slots));
break;
}
+ case TFileFormatType::FORMAT_PARQUET: {
+ std::vector<std::string> column_names;
+ reader.reset(new vectorized::ParquetReader(params, range,
column_names));
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ std::vector<std::string> column_names;
+ reader.reset(new vectorized::OrcReader(params, range, column_names,
""));
+ break;
+ }
+ case TFileFormatType::FORMAT_JSON: {
+ std::vector<SlotDescriptor*> file_slots;
+ reader.reset(new vectorized::NewJsonReader(profile.get(), params,
range, file_slots));
+ break;
+ }
default:
st = Status::InternalError("Not supported file format in fetch table
schema: {}",
params.format_type);
st.to_protobuf(result->mutable_status());
return;
}
- std::unordered_map<std::string, TypeDescriptor> name_to_col_type;
std::vector<std::string> col_names;
std::vector<TypeDescriptor> col_types;
st = reader->get_parsered_schema(&col_names, &col_types);
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index a8fbeae1c1..6b60a144cf 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -49,13 +49,32 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator),
_scanner_eof(scanner_eof) {
- _file_format_type = _params.format_type;
-
_bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
_read_timer = ADD_TIMER(_profile, "ReadTime");
_file_read_timer = ADD_TIMER(_profile, "FileReadTime");
}
+NewJsonReader::NewJsonReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range,
+ const std::vector<SlotDescriptor*>&
file_slot_descs)
+ : _vhandle_json_callback(nullptr),
+ _state(nullptr),
+ _profile(profile),
+ _params(params),
+ _range(range),
+ _file_slot_descs(file_slot_descs),
+ _file_reader(nullptr),
+ _file_reader_s(nullptr),
+ _real_file_reader(nullptr),
+ _line_reader(nullptr),
+ _reader_eof(false),
+ _skip_first_line(false),
+ _next_row(0),
+ _total_rows(0),
+ _value_allocator(_value_buffer, sizeof(_value_buffer)),
+ _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
+ _origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator) {}
+
Status NewJsonReader::init_reader() {
RETURN_IF_ERROR(_get_range_params());
@@ -120,6 +139,113 @@ Status
NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
+Status NewJsonReader::get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>*
col_types) {
+ RETURN_IF_ERROR(_get_range_params());
+
+ RETURN_IF_ERROR(_open_file_reader());
+ if (_read_json_by_line) {
+ RETURN_IF_ERROR(_open_line_reader());
+ }
+
+ // generate _parsed_jsonpaths and _parsed_json_root
+ RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
+
+ bool eof = false;
+ const uint8_t* json_str = nullptr;
+ std::unique_ptr<uint8_t[]> json_str_ptr;
+ size_t size = 0;
+ if (_line_reader != nullptr) {
+ RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof));
+ } else {
+ int64_t length = 0;
+ RETURN_IF_ERROR(_real_file_reader->read_one_message(&json_str_ptr,
&length));
+ json_str = json_str_ptr.get();
+ size = length;
+ if (length == 0) {
+ eof = true;
+ }
+ }
+
+ if (size == 0 || eof) {
+ return Status::EndOfFile("Empty file.");
+ }
+
+ // clear memory here.
+ _value_allocator.Clear();
+ _parse_allocator.Clear();
+ bool has_parse_error = false;
+
+ // parse jsondata to JsonDoc
+ // As the issue: https://github.com/Tencent/rapidjson/issues/1458
+ // Now, rapidjson only support uint64_t, So lagreint load cause bug. We
use kParseNumbersAsStringsFlag.
+ if (_num_as_string) {
+ has_parse_error =
+
_origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str,
size)
+ .HasParseError();
+ } else {
+ has_parse_error = _origin_json_doc.Parse((char*)json_str,
size).HasParseError();
+ }
+
+ if (has_parse_error) {
+ return Status::DataQualityError(
+ "Parse json data for JsonDoc failed. code: {}, error info: {}",
+ _origin_json_doc.GetParseError(),
+ rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
+ }
+
+ // set json root
+ if (_parsed_json_root.size() != 0) {
+ _json_doc = JsonFunctions::get_json_object_from_parsed_json(
+ _parsed_json_root, &_origin_json_doc,
_origin_json_doc.GetAllocator());
+ if (_json_doc == nullptr) {
+ return Status::DataQualityError("JSON Root not found.");
+ }
+ } else {
+ _json_doc = &_origin_json_doc;
+ }
+
+ if (_json_doc->IsArray() && !_strip_outer_array) {
+ return Status::DataQualityError(
+ "JSON data is array-object, `strip_outer_array` must be
TRUE.");
+ } else if (!_json_doc->IsArray() && _strip_outer_array) {
+ return Status::DataQualityError(
+ "JSON data is not an array-object, `strip_outer_array` must be
FALSE.");
+ }
+
+ rapidjson::Value* objectValue = nullptr;
+ if (_json_doc->IsArray()) {
+ if (_json_doc->Size() == 0) {
+ // may be passing an empty json, such as "[]"
+ return Status::InternalError("Empty first json line");
+ }
+ objectValue = &(*_json_doc)[0];
+ } else {
+ objectValue = _json_doc;
+ }
+
+ // use jsonpaths to col_names
+ if (_parsed_jsonpaths.size() > 0) {
+ for (size_t i = 0; i < _parsed_jsonpaths.size(); ++i) {
+ size_t len = _parsed_jsonpaths[i].size();
+ if (len == 0) {
+ return Status::InvalidArgument("It's invalid jsonpaths.");
+ }
+ std::string key = _parsed_jsonpaths[i][len - 1].key;
+ col_names->emplace_back(key);
+ col_types->emplace_back(TypeDescriptor::create_string_type());
+ }
+ return Status::OK();
+ }
+
+ for (int i = 0; i < objectValue->MemberCount(); ++i) {
+ auto it = objectValue->MemberBegin() + i;
+ col_names->emplace_back(it->name.GetString());
+ col_types->emplace_back(TypeDescriptor::create_string_type());
+ }
+ return Status::OK();
+}
+
Status NewJsonReader::_get_range_params() {
if (!_params.__isset.file_attributes) {
return Status::InternalError("BE cat get file_attributes");
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index aee11535fb..6b003c30fe 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -39,12 +39,17 @@ public:
NewJsonReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc&
range,
const std::vector<SlotDescriptor*>& file_slot_descs, bool*
scanner_eof);
+
+ NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, const
std::vector<SlotDescriptor*>& file_slot_descs);
~NewJsonReader() override = default;
Status init_reader();
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types)
override;
private:
Status _get_range_params();
@@ -107,8 +112,6 @@ private:
std::unique_ptr<LineReader> _line_reader;
bool _reader_eof;
- TFileFormatType::type _file_format_type;
-
// When we fetch range doesn't start from 0 will always skip the first line
bool _skip_first_line;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 77375831f6..0cd19bdae6 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -81,6 +81,14 @@ OrcReader::OrcReader(RuntimeProfile* profile, const
TFileScanRangeParams& params
_init_profile();
}
+OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ const std::vector<std::string>& column_names, const
std::string& ctz)
+ : _profile(nullptr),
+ _scan_params(params),
+ _scan_range(range),
+ _ctz(ctz),
+ _column_names(column_names) {}
+
OrcReader::~OrcReader() {
close();
}
@@ -166,6 +174,40 @@ Status OrcReader::init_reader(
return Status::OK();
}
+Status OrcReader::get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
+ if (_file_reader == nullptr) {
+ std::unique_ptr<FileReader> inner_reader;
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range.path,
+
_scan_range.start_offset,
+ _scan_range.file_size,
0, inner_reader));
+ RETURN_IF_ERROR(inner_reader->open());
+ _file_reader = new ORCFileInputStream(_scan_range.path,
inner_reader.release());
+ }
+ if (_file_reader->getLength() == 0) {
+ return Status::EndOfFile("Empty orc file");
+ }
+
+ // create orc reader
+ try {
+ orc::ReaderOptions options;
+ _reader =
orc::createReader(std::unique_ptr<ORCFileInputStream>(_file_reader), options);
+ } catch (std::exception& e) {
+ return Status::InternalError("Init OrcReader failed. reason = {}",
e.what());
+ }
+
+ if (_reader->getNumberOfRows() == 0) {
+ return Status::EndOfFile("Empty orc file");
+ }
+
+ auto& root_type = _reader->getType();
+ for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
+ col_names->emplace_back(root_type.getFieldName(i));
+
col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
+ }
+ return Status::OK();
+}
+
Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
std::unordered_set<std::string> orc_cols;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index c9b1d2dc45..9b0e4b6a44 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -77,6 +77,9 @@ public:
const TFileRangeDesc& range, const std::vector<std::string>&
column_names,
size_t batch_size, const std::string& ctz);
+ OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+ const std::vector<std::string>& column_names, const std::string&
ctz);
+
~OrcReader() override;
// for test
void set_file_reader(const std::string& file_name, FileReader*
file_reader) {
@@ -96,6 +99,9 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types)
override;
+
private:
struct OrcProfile {
RuntimeProfile::Counter* read_time;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index af899fb318..bfbccbcb7c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -40,6 +40,13 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams
_init_profile();
}
+ParquetReader::ParquetReader(const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ const std::vector<std::string>& column_names)
+ : _profile(nullptr),
+ _scan_params(params),
+ _scan_range(range),
+ _column_names(column_names) {}
+
ParquetReader::~ParquetReader() {
close();
}
@@ -182,6 +189,34 @@ std::unordered_map<std::string, TypeDescriptor>
ParquetReader::get_name_to_type(
return map;
}
+Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>*
col_types) {
+ if (_file_reader == nullptr) {
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_scan_params, _scan_range.path,
+
_scan_range.start_offset,
+ _scan_range.file_size,
0, _file_reader));
+ }
+ RETURN_IF_ERROR(_file_reader->open());
+ if (_file_reader->size() == 0) {
+ return Status::EndOfFile("Empty Parquet File");
+ }
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
+ _t_metadata = &_file_metadata->to_thrift();
+
+ _total_groups = _t_metadata->row_groups.size();
+ if (_total_groups == 0) {
+ return Status::EndOfFile("Empty Parquet File");
+ }
+
+ auto schema_desc = _file_metadata->schema();
+ for (int i = 0; i < schema_desc.size(); ++i) {
+ // Get the Column Reader for the boolean column
+ col_names->emplace_back(schema_desc.get_column(i)->name);
+ col_types->emplace_back(schema_desc.get_column(i)->type);
+ }
+ return Status::OK();
+}
+
Status ParquetReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
std::unordered_set<std::string>*
missing_cols) {
const auto& schema_desc = _file_metadata->schema();
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 50a8505652..22a99df22f 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -54,6 +54,9 @@ public:
const TFileRangeDesc& range, const std::vector<std::string>&
column_names,
size_t batch_size, cctz::time_zone* ctz);
+ ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ const std::vector<std::string>& column_names);
+
~ParquetReader() override;
// for test
void set_file_reader(FileReader* file_reader) {
_file_reader.reset(file_reader); }
@@ -71,6 +74,9 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override;
+ Status get_parsered_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types)
override;
+
Statistics& statistics() { return _statistics; }
private:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 9fd59fc43d..a5e8a7aabc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -33,9 +33,11 @@ import org.apache.doris.common.UserException;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.load.BrokerFileGroup;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
+import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
@@ -270,21 +272,16 @@ public class HiveScanProvider extends
HMSTableScanProvider {
}
@Override
- public String getColumnSeparator() throws UserException {
- return hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
- .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER);
+ public TFileAttributes getFileAttributes() throws UserException {
+ TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+ .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
+ textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
+ TFileAttributes fileAttributes = new TFileAttributes();
+ fileAttributes.setTextParams(textParams);
+ fileAttributes.setHeaderType("");
+ return fileAttributes;
}
-
- @Override
- public String getLineSeparator() {
- return DEFAULT_LINE_DELIMITER;
- }
-
- @Override
- public String getHeaderType() {
- return "";
- }
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 4b865560d7..eae1829603 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -29,7 +29,6 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
@@ -52,11 +51,7 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
private int inputSplitNum = 0;
private long inputFileSize = 0;
- public abstract String getColumnSeparator() throws UserException;
-
- public abstract String getLineSeparator();
-
- public abstract String getHeaderType();
+ public abstract TFileAttributes getFileAttributes() throws UserException;
@Override
public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
@@ -78,14 +73,8 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
- if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
- TFileTextScanRangeParams textParams = new
TFileTextScanRangeParams();
- textParams.setColumnSeparator(getColumnSeparator());
- textParams.setLineDelimiter(getLineSeparator());
- TFileAttributes fileAttributes = new TFileAttributes();
- fileAttributes.setTextParams(textParams);
- fileAttributes.setHeaderType(getHeaderType());
- context.params.setFileAttributes(fileAttributes);
+ if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
+ context.params.setFileAttributes(getFileAttributes());
}
// set hdfs params for hdfs file type.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
index 88627cef71..8c8bdf9d30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
@@ -31,6 +31,7 @@ import org.apache.doris.load.BrokerFileGroup;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
@@ -59,18 +60,8 @@ public class TVFScanProvider extends QueryScanProvider {
// =========== implement abstract methods of QueryScanProvider
=================
@Override
- public String getColumnSeparator() throws UserException {
- return tableValuedFunction.getColumnSeparator();
- }
-
- @Override
- public String getLineSeparator() {
- return tableValuedFunction.getLineSeparator();
- }
-
- @Override
- public String getHeaderType() {
- return tableValuedFunction.getHeaderType();
+ public TFileAttributes getFileAttributes() throws UserException {
+ return tableValuedFunction.getFileAttributes();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index d6b6af0ce0..ac69cad8de 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.planner.PlanNodeId;
@@ -65,8 +66,15 @@ import java.util.concurrent.Future;
*/
public abstract class ExternalFileTableValuedFunction extends
TableValuedFunctionIf {
public static final Logger LOG =
LogManager.getLogger(ExternalFileTableValuedFunction.class);
- public static final String DEFAULT_COLUMN_SEPARATOR = ",";
- public static final String DEFAULT_LINE_DELIMITER = "\n";
+ protected static final String DEFAULT_COLUMN_SEPARATOR = ",";
+ protected static final String DEFAULT_LINE_DELIMITER = "\n";
+ protected static final String FORMAT = "format";
+ protected static final String COLUMN_SEPARATOR = "column_separator";
+ protected static final String LINE_DELIMITER = "line_delimiter";
+ protected static final String JSON_ROOT = "json_root";
+ protected static final String JSON_PATHS = "jsonpaths";
+ protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
+ protected static final String READ_JSON_BY_LINE = "read_json_by_line";
protected List<Column> columns = null;
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
@@ -77,6 +85,11 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
protected String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
protected String lineDelimiter = DEFAULT_LINE_DELIMITER;
+ protected String jsonRoot = "";
+ protected String jsonPaths = "";
+ protected String stripOuterArray = "";
+ protected String readJsonByLine = "";
+
public abstract TFileType getTFileType();
@@ -92,28 +105,77 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return locationProperties;
}
- public String getColumnSeparator() {
- return columnSeparator;
- }
-
- public String getLineSeparator() {
- return lineDelimiter;
- }
-
- public String getHeaderType() {
- return headerType;
- }
-
- public void parseFile() throws UserException {
+ protected void parseFile() throws UserException {
String path = getFilePath();
BrokerDesc brokerDesc = getBrokerDesc();
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
}
+ protected void parseProperties(Map<String, String> validParams) throws
UserException {
+ String formatString = validParams.getOrDefault(FORMAT,
"").toLowerCase();
+ switch (formatString) {
+ case "csv":
+ this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+ break;
+ case "csv_with_names":
+ this.headerType = FeConstants.csv_with_names;
+ this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+ break;
+ case "csv_with_names_and_types":
+ this.headerType = FeConstants.csv_with_names_and_types;
+ this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
+ break;
+ case "parquet":
+ this.fileFormatType = TFileFormatType.FORMAT_PARQUET;
+ break;
+ case "orc":
+ this.fileFormatType = TFileFormatType.FORMAT_ORC;
+ break;
+ case "json":
+ this.fileFormatType = TFileFormatType.FORMAT_JSON;
+ break;
+ default:
+ throw new AnalysisException("format:" + formatString + " is
not supported.");
+ }
+
+ columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR,
DEFAULT_COLUMN_SEPARATOR);
+ lineDelimiter = validParams.getOrDefault(LINE_DELIMITER,
DEFAULT_LINE_DELIMITER);
+ jsonRoot = validParams.getOrDefault(JSON_ROOT, "");
+ jsonPaths = validParams.getOrDefault(JSON_PATHS, "");
+ stripOuterArray = validParams.getOrDefault(STRIP_OUTER_ARRAY,
"false").toLowerCase();
+ readJsonByLine = validParams.getOrDefault(READ_JSON_BY_LINE,
"true").toLowerCase();
+ }
+
public List<TBrokerFileStatus> getFileStatuses() {
return fileStatuses;
}
+ public TFileAttributes getFileAttributes() {
+ TFileAttributes fileAttributes = new TFileAttributes();
+ TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
+ fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
+ fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
+ fileAttributes.setTextParams(fileTextScanRangeParams);
+ if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
+ fileAttributes.setHeaderType(this.headerType);
+ } else if (this.fileFormatType == TFileFormatType.FORMAT_JSON) {
+ fileAttributes.setJsonRoot(jsonRoot);
+ fileAttributes.setJsonpaths(jsonPaths);
+ if (readJsonByLine.equalsIgnoreCase("true")) {
+ fileAttributes.setReadJsonByLine(true);
+ } else {
+ fileAttributes.setReadJsonByLine(false);
+ }
+ if (stripOuterArray.equalsIgnoreCase("true")) {
+ fileAttributes.setStripOuterArray(true);
+ } else {
+ fileAttributes.setStripOuterArray(false);
+ }
+ // TODO(ftw): num_as_string/fuzzy_parser?
+ }
+ return fileAttributes;
+ }
+
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
return new ExternalFileScanNode(id, desc);
@@ -218,16 +280,4 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return InternalService.PFetchTableSchemaRequest.newBuilder()
.setFileScanRange(ByteString.copyFrom(new
TSerializer().serialize(fileScanRange))).build();
}
-
- private TFileAttributes getFileAttributes() {
- TFileAttributes fileAttributes = new TFileAttributes();
- if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
- TFileTextScanRangeParams fileTextScanRangeParams = new
TFileTextScanRangeParams();
- fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
- fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
- fileAttributes.setTextParams(fileTextScanRangeParams);
- fileAttributes.setHeaderType(this.headerType);
- }
- return fileAttributes;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 1a615e787c..784a75accc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -20,10 +20,8 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
-import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.ImmutableSet;
@@ -40,14 +38,13 @@ import java.util.Map;
public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
public static final Logger LOG =
LogManager.getLogger(S3TableValuedFunction.class);
public static final String NAME = "s3";
- public static final String S3_URI = "URI";
+ public static final String S3_URI = "uri";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
- public static final String FORMAT = "FORMAT";
- private static final String AK = "ACCESS_KEY";
- private static final String SK = "SECRET_KEY";
+ private static final String AK = "access_key";
+ private static final String SK = "secret_key";
public static final String USE_PATH_STYLE = "use_path_style";
@@ -56,6 +53,10 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
.add(AK)
.add(SK)
.add(FORMAT)
+ .add(JSON_ROOT)
+ .add(JSON_PATHS)
+ .add(STRIP_OUTER_ARRAY)
+ .add(READ_JSON_BY_LINE)
.build();
private S3URI s3uri;
private String s3AK;
@@ -64,31 +65,17 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
public S3TableValuedFunction(Map<String, String> params) throws
UserException {
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
- if (!PROPERTIES_SET.contains(key.toUpperCase())) {
+ if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new AnalysisException(key + " is invalid property");
}
- validParams.put(key.toUpperCase(), params.get(key));
+ validParams.put(key.toLowerCase(), params.get(key));
}
s3uri = S3URI.create(validParams.get(S3_URI));
s3AK = validParams.getOrDefault(AK, "");
s3SK = validParams.getOrDefault(SK, "");
- String formatString = validParams.getOrDefault(FORMAT, "");
- switch (formatString.toLowerCase()) {
- case "csv":
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "csv_with_names":
- this.headerType = FeConstants.csv_with_names;
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- case "csv_with_names_and_types":
- this.headerType = FeConstants.csv_with_names_and_types;
- this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
- break;
- default:
- throw new AnalysisException("format:" + formatString + " is
not supported.");
- }
+
+ parseProperties(validParams);
// set S3 location properties
locationProperties = Maps.newHashMap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]