This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.1-tmp by this push: new 0ef6eabc74c [Enhancement](hive-writer) Write only regular fields to file in the hive-writer. (#33000) (#33495) 0ef6eabc74c is described below commit 0ef6eabc74cb5d93c3e8d62feaeb91bcca37204a Author: Qi Chen <kaka11.c...@gmail.com> AuthorDate: Wed Apr 10 20:37:22 2024 +0800 [Enhancement](hive-writer) Write only regular fields to file in the hive-writer. (#33000) (#33495) --- be/src/vec/runtime/vorc_transformer.cpp | 104 +++++++++++++++++-- be/src/vec/runtime/vorc_transformer.h | 10 +- be/src/vec/runtime/vparquet_transformer.cpp | 32 +++++- be/src/vec/runtime/vparquet_transformer.h | 10 +- be/src/vec/sink/writer/vhive_partition_writer.cpp | 118 ++++------------------ be/src/vec/sink/writer/vhive_partition_writer.h | 10 +- be/src/vec/sink/writer/vhive_table_writer.cpp | 22 +++- be/src/vec/sink/writer/vhive_table_writer.h | 3 + 8 files changed, 184 insertions(+), 125 deletions(-) diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 764a97ae5bc..a07f734acac 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -99,35 +99,45 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* fil : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), _file_writer(file_writer), _write_options(new orc::WriterOptions()), - _schema_str(&schema), - _schema(nullptr) { + _schema_str(&schema) { _write_options->setTimezoneName(_state->timezone()); _write_options->setUseTightNumericVector(true); } VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, - std::unique_ptr<orc::Type> schema, bool output_object_data, + std::vector<std::string> column_names, bool output_object_data, orc::CompressionKind compression) : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), _file_writer(file_writer), + _column_names(std::move(column_names)), _write_options(new orc::WriterOptions()), - _schema_str(nullptr), - _schema(std::move(schema)) { + _schema_str(nullptr) { _write_options->setTimezoneName(_state->timezone()); _write_options->setUseTightNumericVector(true); _write_options->setCompression(compression); } Status VOrcTransformer::open() { - try { - if (_schema == nullptr && _schema_str != nullptr) { + if (_schema_str != nullptr) { + try { _schema = orc::Type::buildTypeFromString(*_schema_str); + } catch (const std::exception& e) { + return Status::InternalError("Orc build schema from \"{}\" failed: {}", *_schema_str, + e.what()); + } + } else { + _schema = orc::createStructType(); + for (int i = 0; i < _output_vexpr_ctxs.size(); i++) { + VExprSPtr column_expr = _output_vexpr_ctxs[i]->root(); + try { + _schema->addStructField(_column_names[i], _build_orc_type(column_expr->type())); + } catch (doris::Exception& e) { + return e.to_status(); + } } - } catch (const std::exception& e) { - return Status::InternalError("Orc build schema from \"{}\" failed: {}", *_schema_str, - e.what()); } + _output_stream = std::make_unique<VOrcOutputStream>(_file_writer); _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); if (_writer == nullptr) { @@ -136,6 +146,80 @@ Status VOrcTransformer::open() { return Status::OK(); } +std::unique_ptr<orc::Type> VOrcTransformer::_build_orc_type(const TypeDescriptor& type_descriptor) { + std::pair<Status, std::unique_ptr<orc::Type>> result; + switch (type_descriptor.type) { + case TYPE_BOOLEAN: { + return orc::createPrimitiveType(orc::BOOLEAN); + } + case TYPE_TINYINT: { + return orc::createPrimitiveType(orc::BYTE); + } + case TYPE_SMALLINT: { + return orc::createPrimitiveType(orc::SHORT); + } + case TYPE_INT: { + return orc::createPrimitiveType(orc::INT); + } + case TYPE_BIGINT: { + return orc::createPrimitiveType(orc::LONG); + } + case TYPE_FLOAT: { + return orc::createPrimitiveType(orc::FLOAT); + } + case TYPE_DOUBLE: { + return orc::createPrimitiveType(orc::DOUBLE); + } + case TYPE_CHAR: { + return orc::createCharType(orc::CHAR, type_descriptor.len); + } + case TYPE_VARCHAR: { + return orc::createCharType(orc::VARCHAR, type_descriptor.len); + } + case TYPE_STRING: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_BINARY: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_DATEV2: { + return orc::createPrimitiveType(orc::DATE); + } + case TYPE_DATETIMEV2: { + return orc::createPrimitiveType(orc::TIMESTAMP); + } + case TYPE_DECIMAL32: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL64: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL128I: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_STRUCT: { + std::unique_ptr<orc::Type> struct_type = orc::createStructType(); + for (int j = 0; j < type_descriptor.children.size(); ++j) { + struct_type->addStructField(type_descriptor.field_names[j], + _build_orc_type(type_descriptor.children[j])); + } + return struct_type; + } + case TYPE_ARRAY: { + return orc::createListType(_build_orc_type(type_descriptor.children[0])); + } + case TYPE_MAP: { + return orc::createMapType(_build_orc_type(type_descriptor.children[0]), + _build_orc_type(type_descriptor.children[1])); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} to build orc type", + type_descriptor.debug_string()); + } + } +} + std::unique_ptr<orc::ColumnVectorBatch> VOrcTransformer::_create_row_batch(size_t sz) { return _writer->createRowBatch(sz); } diff --git a/be/src/vec/runtime/vorc_transformer.h b/be/src/vec/runtime/vorc_transformer.h index 8cfc956c0cd..1f78e549a1b 100644 --- a/be/src/vec/runtime/vorc_transformer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -79,8 +79,9 @@ public: bool output_object_data); VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, std::unique_ptr<orc::Type> schema, - bool output_object_data, orc::CompressionKind compression); + const VExprContextSPtrs& output_vexpr_ctxs, + std::vector<std::string> column_names, bool output_object_data, + orc::CompressionKind compression); ~VOrcTransformer() = default; @@ -93,6 +94,8 @@ public: int64_t written_len() override; private: + std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& type_descriptor); + std::unique_ptr<orc::ColumnVectorBatch> _create_row_batch(size_t sz); // The size of subtypes of a complex type may be different from // the size of the complex type itself, @@ -101,6 +104,7 @@ private: orc::ColumnVectorBatch* orc_col_batch); doris::io::FileWriter* _file_writer = nullptr; + std::vector<std::string> _column_names; std::unique_ptr<orc::OutputStream> _output_stream; std::unique_ptr<orc::WriterOptions> _write_options; const std::string* _schema_str; @@ -117,4 +121,4 @@ private: static constexpr size_t BUFFER_RESERVED_SIZE = 40; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index b831b1e2dc5..0e5800750b0 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -197,6 +197,22 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build } } +VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + std::vector<std::string> column_names, + TParquetCompressionType::type compression_type, + bool parquet_disable_dictionary, + TParquetVersion::type parquet_version, + bool output_object_data) + : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), + _column_names(std::move(column_names)), + _parquet_schemas(nullptr), + _compression_type(compression_type), + _parquet_disable_dictionary(parquet_disable_dictionary), + _parquet_version(parquet_version) { + _outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer)); +} + VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, const std::vector<TParquetSchema>& parquet_schemas, @@ -205,7 +221,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState* state, doris::io::FileWri TParquetVersion::type parquet_version, bool output_object_data) : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data), - _parquet_schemas(parquet_schemas), + _parquet_schemas(&parquet_schemas), _compression_type(compression_type), _parquet_disable_dictionary(parquet_disable_dictionary), _parquet_version(parquet_version) { @@ -238,10 +254,16 @@ Status VParquetTransformer::_parse_schema() { for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { std::shared_ptr<arrow::DataType> type; RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type)); - std::shared_ptr<arrow::Field> field = - arrow::field(_parquet_schemas[i].schema_column_name, type, - _output_vexpr_ctxs[i]->root()->is_nullable()); - fields.emplace_back(field); + if (_parquet_schemas != nullptr) { + std::shared_ptr<arrow::Field> field = + arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, + _output_vexpr_ctxs[i]->root()->is_nullable()); + fields.emplace_back(field); + } else { + std::shared_ptr<arrow::Field> field = arrow::field( + _column_names[i], type, _output_vexpr_ctxs[i]->root()->is_nullable()); + fields.emplace_back(field); + } } _arrow_schema = arrow::schema(std::move(fields)); return Status::OK(); diff --git a/be/src/vec/runtime/vparquet_transformer.h b/be/src/vec/runtime/vparquet_transformer.h index c9d2de59a51..78be0959363 100644 --- a/be/src/vec/runtime/vparquet_transformer.h +++ b/be/src/vec/runtime/vparquet_transformer.h @@ -89,6 +89,13 @@ public: // a wrapper of parquet output stream class VParquetTransformer final : public VFileFormatTransformer { public: + VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + std::vector<std::string> column_names, + TParquetCompressionType::type compression_type, + bool parquet_disable_dictionary, TParquetVersion::type parquet_version, + bool output_object_data); + VParquetTransformer(RuntimeState* state, doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, const std::vector<TParquetSchema>& parquet_schemas, @@ -117,7 +124,8 @@ private: std::unique_ptr<parquet::arrow::FileWriter> _writer; std::shared_ptr<arrow::Schema> _arrow_schema; - const std::vector<TParquetSchema>& _parquet_schemas; + std::vector<std::string> _column_names; + const std::vector<TParquetSchema>* _parquet_schemas = nullptr; const TParquetCompressionType::type _compression_type; const bool _parquet_disable_dictionary; const TParquetVersion::type _parquet_version; diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index ca6e76862ba..7f9a0dd1b1e 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -20,6 +20,7 @@ #include "io/file_factory.h" #include "io/fs/file_system.h" #include "runtime/runtime_state.h" +#include "vec/columns/column_map.h" #include "vec/core/materialize_block.h" #include "vec/runtime/vorc_transformer.h" #include "vec/runtime/vparquet_transformer.h" @@ -28,14 +29,17 @@ namespace doris { namespace vectorized { VHivePartitionWriter::VHivePartitionWriter( - const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, - const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, - WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs, + const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns, + WriteInfo write_info, std::string file_name, TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, const std::map<std::string, std::string>& hadoop_conf) : _partition_name(std::move(partition_name)), _update_mode(update_mode), _vec_output_expr_ctxs(output_expr_ctxs), + _write_output_expr_ctxs(write_output_expr_ctxs), + _non_write_columns_indices(non_write_columns_indices), _columns(columns), _write_info(std::move(write_info)), _file_name(std::move(file_name)), @@ -53,6 +57,12 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer)); + std::vector<std::string> column_names; + column_names.reserve(_columns.size()); + for (int i = 0; i < _columns.size(); i++) { + column_names.push_back(_columns[i].name); + } + switch (_file_format_type) { case TFileFormatType::FORMAT_PARQUET: { bool parquet_disable_dictionary = false; @@ -75,16 +85,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) to_string(_hive_compress_type)); } } - std::vector<TParquetSchema> parquet_schemas; - parquet_schemas.reserve(_columns.size()); - for (int i = 0; i < _columns.size(); i++) { - VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); - TParquetSchema parquet_schema; - parquet_schema.schema_column_name = _columns[i].name; - parquet_schemas.emplace_back(std::move(parquet_schema)); - } _file_format_transformer.reset(new VParquetTransformer( - state, _file_writer.get(), _vec_output_expr_ctxs, parquet_schemas, + state, _file_writer.get(), _write_output_expr_ctxs, std::move(column_names), parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, false)); return _file_format_transformer->open(); @@ -112,21 +114,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); } } - orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; - - std::unique_ptr<orc::Type> root_schema = orc::createStructType(); - for (int i = 0; i < _columns.size(); i++) { - VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); - try { - root_schema->addStructField(_columns[i].name, _build_orc_type(column_expr->type())); - } catch (doris::Exception& e) { - return e.to_status(); - } - } _file_format_transformer.reset( - new VOrcTransformer(state, _file_writer.get(), _vec_output_expr_ctxs, - std::move(root_schema), false, orc_compression_type)); + new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, + std::move(column_names), false, orc_compression_type)); return _file_format_transformer->open(); } default: { @@ -164,81 +155,6 @@ Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn return Status::OK(); } -std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type( - const TypeDescriptor& type_descriptor) { - std::pair<Status, std::unique_ptr<orc::Type>> result; - switch (type_descriptor.type) { - case TYPE_BOOLEAN: { - return orc::createPrimitiveType(orc::BOOLEAN); - } - case TYPE_TINYINT: { - return orc::createPrimitiveType(orc::BYTE); - } - case TYPE_SMALLINT: { - return orc::createPrimitiveType(orc::SHORT); - } - case TYPE_INT: { - return orc::createPrimitiveType(orc::INT); - } - case TYPE_BIGINT: { - return orc::createPrimitiveType(orc::LONG); - } - case TYPE_FLOAT: { - return orc::createPrimitiveType(orc::FLOAT); - } - case TYPE_DOUBLE: { - return orc::createPrimitiveType(orc::DOUBLE); - } - case TYPE_CHAR: { - return orc::createCharType(orc::CHAR, type_descriptor.len); - } - case TYPE_VARCHAR: { - return orc::createCharType(orc::VARCHAR, type_descriptor.len); - } - case TYPE_STRING: { - return orc::createPrimitiveType(orc::STRING); - } - case TYPE_BINARY: { - return orc::createPrimitiveType(orc::STRING); - } - case TYPE_DATEV2: { - return orc::createPrimitiveType(orc::DATE); - } - case TYPE_DATETIMEV2: { - return orc::createPrimitiveType(orc::TIMESTAMP); - } - case TYPE_DECIMAL32: { - return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); - } - case TYPE_DECIMAL64: { - return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); - } - case TYPE_DECIMAL128I: { - return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); - } - case TYPE_STRUCT: { - std::unique_ptr<orc::Type> struct_type = orc::createStructType(); - for (int j = 0; j < type_descriptor.children.size(); ++j) { - struct_type->addStructField(type_descriptor.field_names[j], - _build_orc_type(type_descriptor.children[j])); - } - return struct_type; - } - case TYPE_ARRAY: { - return orc::createListType(_build_orc_type(type_descriptor.children[0])); - } - case TYPE_MAP: { - return orc::createMapType(_build_orc_type(type_descriptor.children[0]), - _build_orc_type(type_descriptor.children[1])); - } - default: { - throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported type {} to build orc type", - type_descriptor.debug_string()); - } - } -} - Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, const vectorized::IColumn::Filter* filter, doris::vectorized::Block* output_block) { @@ -263,6 +179,8 @@ Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Blo Block::filter_block_internal(output_block, columns_to_filter, *filter); + output_block->erase(_non_write_columns_indices); + return status; } diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index d3c52800bea..f6d2caec94d 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -43,10 +43,12 @@ public: TFileType::type file_type; }; - VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, + VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + const VExprContextSPtrs& write_output_expr_ctxs, + const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns, WriteInfo write_info, - const std::string file_name, TFileFormatType::type file_format_type, + std::string file_name, TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, const std::map<std::string, std::string>& hadoop_conf); @@ -61,8 +63,6 @@ public: inline size_t written_len() { return _file_format_transformer->written_len(); } private: - std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor& type_descriptor); - Status _projection_and_filter_block(doris::vectorized::Block& input_block, const vectorized::IColumn::Filter* filter, doris::vectorized::Block* output_block); @@ -79,6 +79,8 @@ private: size_t _input_size_in_bytes = 0; const VExprContextSPtrs& _vec_output_expr_ctxs; + const VExprContextSPtrs& _write_output_expr_ctxs; + const std::set<size_t>& _non_write_columns_indices; const std::vector<THiveColumn>& _columns; WriteInfo _write_info; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 4ea5fcbf4ed..e56090773b5 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -43,8 +43,25 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { _profile = profile; for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { - if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + switch (_t_sink.hive_table_sink.columns[i].column_type) { + case THiveColumnType::PARTITION_KEY: { _partition_columns_input_index.emplace_back(i); + _non_write_columns_indices.insert(i); + break; + } + case THiveColumnType::REGULAR: { + _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]); + break; + } + case THiveColumnType::SYNTHESIZED: { + _non_write_columns_indices.insert(i); + break; + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Illegal hive column type {}, it should not be here.", + to_string(_t_sink.hive_table_sink.columns[i].column_type)); + } } } return Status::OK(); @@ -232,7 +249,8 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer return std::make_shared<VHivePartitionWriter>( _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, - hive_table_sink.columns, std::move(write_info), + _write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns, + std::move(write_info), fmt::format("{}{}", _compute_file_name(), _get_file_extension(file_format_type, write_compress_type)), file_format_type, write_compress_type, hive_table_sink.hadoop_config); diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index a4681b32e3f..9f48f6afde1 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -68,7 +68,10 @@ private: RuntimeState* _state = nullptr; RuntimeProfile* _profile = nullptr; std::vector<int> _partition_columns_input_index; + std::set<size_t> _non_write_columns_indices; std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>> _partitions_to_writers; + + VExprContextSPtrs _write_output_vexpr_ctxs; }; } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org