This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6e97b6e3f56f4129e6c8e09df215e333cf48dfd8 Author: Qi Chen <[email protected]> AuthorDate: Tue May 28 14:09:15 2024 +0800 [Fix](hive-writer) Fix partition column orders issue when the partition fields inserted into the target table are inconsistent with the field order of the query source table and the schema field order of the query source table. (#35347) Fix partition column orders issue when the partition fields inserted into the target table are inconsistent with the field order of the query source table and the schema field order of the query source table. Please look at `regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy` in the PR. --- be/src/vec/sink/writer/vhive_partition_writer.cpp | 69 +++++--------------- be/src/vec/sink/writer/vhive_partition_writer.h | 16 ++--- be/src/vec/sink/writer/vhive_table_writer.cpp | 75 +++++++++++++++++----- be/src/vec/sink/writer/vhive_table_writer.h | 4 ++ .../hive/ddl/test_hive_write_type.out | 25 ++++++++ .../hive/ddl/test_hive_write_type.groovy | 57 ++++++++++++++++ 6 files changed, 166 insertions(+), 80 deletions(-) diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index c8b4dd65565..10140c7a06d 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -30,19 +30,19 @@ namespace doris { namespace vectorized { -VHivePartitionWriter::VHivePartitionWriter( - const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, - const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns, - WriteInfo write_info, std::string file_name, int file_name_index, - TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, - const std::map<std::string, std::string>& hadoop_conf) +VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, + TUpdateMode::type update_mode, + const VExprContextSPtrs& write_output_expr_ctxs, + std::vector<std::string> write_column_names, + WriteInfo write_info, std::string file_name, + int file_name_index, + TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) : _partition_name(std::move(partition_name)), _update_mode(update_mode), - _vec_output_expr_ctxs(output_expr_ctxs), _write_output_expr_ctxs(write_output_expr_ctxs), - _non_write_columns_indices(non_write_columns_indices), - _columns(columns), + _write_column_names(std::move(write_column_names)), _write_info(std::move(write_info)), _file_name(std::move(file_name)), _file_name_index(file_name_index), @@ -74,14 +74,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true}; RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); - std::vector<std::string> column_names; - column_names.reserve(_columns.size()); - for (int i = 0; i < _columns.size(); i++) { - if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { - column_names.emplace_back(_columns[i].name); - } - } - switch (_file_format_type) { case TFileFormatType::FORMAT_PARQUET: { bool parquet_disable_dictionary = false; @@ -105,7 +97,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) } } _file_format_transformer.reset(new VParquetTransformer( - state, _file_writer.get(), _write_output_expr_ctxs, std::move(column_names), + state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, false)); return _file_format_transformer->open(); @@ -136,7 +128,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) _file_format_transformer.reset( new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, - std::move(column_names), false, orc_compression_type)); + _write_column_names, false, orc_compression_type)); return _file_format_transformer->open(); } default: { @@ -167,43 +159,12 @@ Status VHivePartitionWriter::close(const Status& status) { return Status::OK(); } -Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { - Block output_block; - RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); - RETURN_IF_ERROR(_file_format_transformer->write(output_block)); - _row_count += output_block.rows(); +Status VHivePartitionWriter::write(vectorized::Block& block) { + RETURN_IF_ERROR(_file_format_transformer->write(block)); + _row_count += block.rows(); return Status::OK(); } -Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block) { - Status status = Status::OK(); - if (input_block.rows() == 0) { - return status; - } - RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - _vec_output_expr_ctxs, input_block, output_block, true)); - materialize_block_inplace(*output_block); - - if (filter == nullptr) { - return status; - } - - std::vector<uint32_t> columns_to_filter; - int column_to_keep = input_block.columns(); - columns_to_filter.resize(column_to_keep); - for (uint32_t i = 0; i < column_to_keep; ++i) { - columns_to_filter[i] = i; - } - - Block::filter_block_internal(output_block, columns_to_filter, *filter); - - output_block->erase(_non_write_columns_indices); - - return status; -} - THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { THivePartitionUpdate hive_partition_update; hive_partition_update.__set_name(_partition_name); diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index b725bcd73f0..852e275fc70 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -49,10 +49,9 @@ public: }; VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, - TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + TUpdateMode::type update_mode, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set<size_t>& non_write_columns_indices, - const std::vector<THiveColumn>& columns, WriteInfo write_info, + std::vector<std::string> write_column_names, WriteInfo write_info, std::string file_name, int file_name_index, TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, @@ -62,7 +61,7 @@ public: Status open(RuntimeState* state, RuntimeProfile* profile); - Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr); + Status write(vectorized::Block& block); Status close(const Status& status); @@ -76,10 +75,6 @@ private: std::string _get_target_file_name(); private: - Status _projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block); - THivePartitionUpdate _build_partition_update(); std::string _get_file_extension(TFileFormatType::type file_format_type, @@ -93,11 +88,10 @@ private: size_t _row_count = 0; - const VExprContextSPtrs& _vec_output_expr_ctxs; const VExprContextSPtrs& _write_output_expr_ctxs; - const std::set<size_t>& _non_write_columns_indices; - const std::vector<THiveColumn>& _columns; + std::vector<std::string> _write_column_names; + WriteInfo _write_info; std::string _file_name; int _file_name_index; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 76f16e3daaa..0e64060eb0b 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -20,6 +20,7 @@ #include "runtime/runtime_state.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/materialize_block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/sink/writer/vhive_partition_writer.h" @@ -82,8 +83,17 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { Status VHiveTableWriter::write(vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); + + if (block.rows() == 0) { + return Status::OK(); + } + Block output_block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, block, &output_block, false)); + materialize_block_inplace(output_block); + std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; - _row_count += block.rows(); + _row_count += output_block.rows(); auto& hive_table_sink = _t_sink.hive_table_sink; if (_partition_columns_input_index.empty()) { @@ -93,7 +103,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { auto writer_iter = _partitions_to_writers.find(""); if (writer_iter == _partitions_to_writers.end()) { try { - writer = _create_partition_writer(block, -1); + writer = _create_partition_writer(output_block, -1); } catch (doris::Exception& e) { return e.to_status(); } @@ -109,7 +119,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } _partitions_to_writers.erase(writer_iter); try { - writer = _create_partition_writer(block, -1, &file_name, + writer = _create_partition_writer(output_block, -1, &file_name, file_name_index + 1); } catch (doris::Exception& e) { return e.to_status(); @@ -122,16 +132,17 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); - RETURN_IF_ERROR(writer->write(block)); + output_block.erase(_non_write_columns_indices); + RETURN_IF_ERROR(writer->write(output_block)); return Status::OK(); } { SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); - for (int i = 0; i < block.rows(); ++i) { + for (int i = 0; i < output_block.rows(); ++i) { std::vector<std::string> partition_values; try { - partition_values = _create_partition_values(block, i); + partition_values = _create_partition_values(output_block, i); } catch (doris::Exception& e) { return e.to_status(); } @@ -143,10 +154,10 @@ Status VHiveTableWriter::write(vectorized::Block& block) { const std::string* file_name, int file_name_index, std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { try { - auto writer = - _create_partition_writer(block, position, file_name, file_name_index); + auto writer = _create_partition_writer(output_block, position, file_name, + file_name_index); RETURN_IF_ERROR(writer->open(_state, _profile)); - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[position] = 1; writer_positions.insert({writer, std::move(filter)}); _partitions_to_writers.insert({partition_name, writer}); @@ -185,7 +196,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } auto writer_pos_iter = writer_positions.find(writer); if (writer_pos_iter == writer_positions.end()) { - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[i] = 1; writer_positions.insert({writer, std::move(filter)}); } else { @@ -195,12 +206,39 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); + output_block.erase(_non_write_columns_indices); for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { - RETURN_IF_ERROR(it->first->write(block, &it->second)); + Block filtered_block; + RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block)); + RETURN_IF_ERROR(it->first->write(filtered_block)); } return Status::OK(); } +Status VHiveTableWriter::_filter_block(doris::vectorized::Block& block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block) { + const ColumnsWithTypeAndName& columns_with_type_and_name = + block.get_columns_with_type_and_name(); + vectorized::ColumnsWithTypeAndName result_columns; + for (int i = 0; i < columns_with_type_and_name.size(); ++i) { + const auto& col = columns_with_type_and_name[i]; + result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type, + col.name); + } + *output_block = {std::move(result_columns)}; + + std::vector<uint32_t> columns_to_filter; + int column_to_keep = output_block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + + Block::filter_block_internal(output_block, columns_to_filter, *filter); + return Status::OK(); +} + Status VHiveTableWriter::close(Status status) { int64_t partitions_to_writers_size = _partitions_to_writers.size(); { @@ -312,11 +350,18 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer } _write_file_count++; + std::vector<std::string> column_names; + column_names.reserve(hive_table_sink.columns.size()); + for (int i = 0; i < hive_table_sink.columns.size(); i++) { + if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { + column_names.emplace_back(hive_table_sink.columns[i].name); + } + } return std::make_shared<VHivePartitionWriter>( - _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, - _write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns, - std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name, - file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config); + _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs, + std::move(column_names), std::move(write_info), + (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index, + file_format_type, write_compress_type, hive_table_sink.hadoop_config); } std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block, diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 3a3f45a6db1..4989ba443c7 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -20,6 +20,7 @@ #include <gen_cpp/DataSinks_types.h> #include "util/runtime_profile.h" +#include "vec/columns/column.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/writer/async_result_writer.h" @@ -62,6 +63,9 @@ private: std::string _compute_file_name(); + Status _filter_block(doris::vectorized::Block& block, const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block); + // Currently it is a copy, maybe it is better to use move semantics to eliminate it. TDataSink _t_sink; RuntimeState* _state = nullptr; diff --git a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out index 1f7d39971db..20d2758c2a6 100644 --- a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out +++ b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out @@ -8,6 +8,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -17,6 +23,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -26,6 +38,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -34,3 +52,10 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d -- !complex_type02 -- a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} + +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy index cf3bd2a9037..0e1e1355afd 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy @@ -266,6 +266,62 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ sql """ DROP DATABASE ${catalog_name}.test_hive_ex """ } + def test_columns_out_of_order = { String file_format, String catalog_name -> + sql """ switch ${catalog_name} """ + sql """ create database if not exists `test_columns_out_of_order` """; + sql """ use `${catalog_name}`.`test_columns_out_of_order` """ + + sql """ drop table if exists columns_out_of_order_source_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_source_tbl_${file_format} ( + `col3` bigint, + `col6` int, + `col1` bigint, + `col4` int, + `col2` bigint, + `col5` int + ) ENGINE = hive + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + sql """ drop table if exists columns_out_of_order_target_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_target_tbl_${file_format} ( + `col1` bigint, + `col2` bigint, + `col3` bigint, + `col4` int, + `col5` int, + `col6` int + ) ENGINE = hive PARTITION BY LIST ( + col4, col5, col6 + )() + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + + sql """ + INSERT INTO columns_out_of_order_source_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + order_qt_columns_out_of_order01 """ SELECT * FROM columns_out_of_order_source_tbl_${file_format} """ + + sql """ + INSERT INTO columns_out_of_order_target_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + + order_qt_columns_out_of_order02 """ SELECT * FROM columns_out_of_order_target_tbl_${file_format} """ + + sql """ drop table columns_out_of_order_source_tbl_${file_format} """ + sql """ drop table columns_out_of_order_target_tbl_${file_format} """ + sql """ drop database if exists `test_columns_out_of_order` """; + } + try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") @@ -285,6 +341,7 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ logger.info("Process file format" + file_format) test_complex_type_tbl(file_format, catalog_name) test_insert_exception(file_format, catalog_name) + test_columns_out_of_order(file_format, catalog_name) } sql """drop catalog if exists ${catalog_name}""" } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
