github-actions[bot] commented on code in PR #32306: URL: https://github.com/apache/doris/pull/32306#discussion_r1527455212
########## be/src/vec/sink/writer/vhive_partition_writer.h: ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> Review Comment: warning: 'gen_cpp/PlanNodes_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/PlanNodes_types.h> ^ ``` ########## be/src/vec/sink/writer/vhive_partition_writer.h: ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include "io/fs/file_writer.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VHivePartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string target_path; + TFileType::type file_type; + }; + + VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + const std::vector<THiveColumn>& columns, WriteInfo write_info, + const std::string file_name, TFileFormatType::type file_format_type, Review Comment: warning: parameter 'file_name' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion std::string file_name, TFileFormatType::type file_format_type, ``` ########## be/src/vec/sink/writer/vhive_partition_writer.h: ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include "io/fs/file_writer.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VHivePartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string target_path; + TFileType::type file_type; + }; + + VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, Review Comment: warning: parameter 'partition_name' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, ``` ########## be/src/vec/sink/writer/vhive_partition_writer.h: ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include "io/fs/file_writer.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VHivePartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string target_path; + TFileType::type file_type; + }; + + VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + const std::vector<THiveColumn>& columns, WriteInfo write_info, + const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf); Review Comment: warning: parameter 10 is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion std::map<std::string, std::string>& hadoop_conf); ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::vectorized { ``` be/src/vec/sink/writer/vhive_table_writer.cpp:430: ```diff - } // namespace vectorized - } // namespace doris + } // namespace doris ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { + std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; + + auto& hive_table_sink = _t_sink.hive_table_sink; + + if (_partition_columns_input_index.empty()) { + auto writer_iter = _partitions_to_writers.find(""); + if (writer_iter == _partitions_to_writers.end()) { + try { + std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + _partitions_to_writers.erase(writer_iter); + try { + writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + } else { + writer = writer_iter->second; + } + RETURN_IF_ERROR(writer->write(block)); + return Status::OK(); + } + } + + for (int i = 0; i < block.rows(); ++i) { + std::vector<std::string> partition_values; + try { + partition_values = _create_partition_values(block, i); + } catch (doris::Exception& e) { + return e.to_status(); + } + std::string partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + + auto create_and_open_writer = + [&](const std::string& partition_name, int position, + std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + try { + auto writer = _create_partition_writer(block, position); + RETURN_IF_ERROR(writer->open(_state, _profile)); + IColumn::Filter filter(block.rows(), 0); + filter[position] = 1; + writer_positions.insert({writer, std::move(filter)}); + _partitions_to_writers.insert({partition_name, writer}); + writer_ptr = writer; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_iter = _partitions_to_writers.find(partition_name); + if (writer_iter == _partitions_to_writers.end()) { + std::shared_ptr<VHivePartitionWriter> writer; + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + writer_positions.erase(writer_iter->second); + _partitions_to_writers.erase(writer_iter); + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + writer = writer_iter->second; + } + auto writer_pos_iter = writer_positions.find(writer); + if (writer_pos_iter == writer_positions.end()) { + IColumn::Filter filter(block.rows(), 0); + filter[i] = 1; + writer_positions.insert({writer, std::move(filter)}); + } else { + writer_pos_iter->second[i] = 1; + } + } + } + + for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { + RETURN_IF_ERROR(it->first->write(block, &it->second)); + } + return Status::OK(); +} + +Status VHiveTableWriter::close(Status status) { + for (const auto& pair : _partitions_to_writers) { + Status st = pair.second->close(status); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); + continue; + } + } + _partitions_to_writers.clear(); + return Status::OK(); +} + +std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( Review Comment: warning: method '_create_partition_writer' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/writer/vhive_table_writer.h:52: ```diff - std::shared_ptr<VHivePartitionWriter> _create_partition_writer(vectorized::Block& block, + static std::shared_ptr<VHivePartitionWriter> _create_partition_writer(vectorized::Block& block, ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { Review Comment: warning: method 'init_properties' can be made static [readability-convert-member-functions-to-static] be/src/vec/sink/writer/vhive_table_writer.h:43: ```diff - Status init_properties(ObjectPool* pool); + static Status init_properties(ObjectPool* pool); ``` ########## be/test/vec/exec/skewed_partition_rebalancer_test.cpp: ########## @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <gtest/gtest.h> Review Comment: warning: 'gtest/gtest.h' file not found [clang-diagnostic-error] ```cpp #include <gtest/gtest.h> ^ ``` ########## be/src/vec/sink/writer/vhive_utils.cpp: ########## @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_utils.h" + +#include <algorithm> +#include <regex> +#include <sstream> + +namespace doris { +namespace vectorized { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::vectorized { ``` be/src/vec/sink/writer/vhive_utils.cpp:76: ```diff - } // namespace vectorized - } // namespace doris + } // namespace doris ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { + std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; + + auto& hive_table_sink = _t_sink.hive_table_sink; + + if (_partition_columns_input_index.empty()) { + auto writer_iter = _partitions_to_writers.find(""); + if (writer_iter == _partitions_to_writers.end()) { + try { + std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + _partitions_to_writers.erase(writer_iter); + try { + writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + } else { + writer = writer_iter->second; + } + RETURN_IF_ERROR(writer->write(block)); + return Status::OK(); + } + } + + for (int i = 0; i < block.rows(); ++i) { + std::vector<std::string> partition_values; + try { + partition_values = _create_partition_values(block, i); + } catch (doris::Exception& e) { + return e.to_status(); + } + std::string partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + + auto create_and_open_writer = + [&](const std::string& partition_name, int position, + std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + try { + auto writer = _create_partition_writer(block, position); + RETURN_IF_ERROR(writer->open(_state, _profile)); + IColumn::Filter filter(block.rows(), 0); + filter[position] = 1; + writer_positions.insert({writer, std::move(filter)}); + _partitions_to_writers.insert({partition_name, writer}); + writer_ptr = writer; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_iter = _partitions_to_writers.find(partition_name); + if (writer_iter == _partitions_to_writers.end()) { + std::shared_ptr<VHivePartitionWriter> writer; + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + writer_positions.erase(writer_iter->second); + _partitions_to_writers.erase(writer_iter); + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + writer = writer_iter->second; + } + auto writer_pos_iter = writer_positions.find(writer); + if (writer_pos_iter == writer_positions.end()) { + IColumn::Filter filter(block.rows(), 0); + filter[i] = 1; + writer_positions.insert({writer, std::move(filter)}); + } else { + writer_pos_iter->second[i] = 1; + } + } + } + + for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { + RETURN_IF_ERROR(it->first->write(block, &it->second)); + } + return Status::OK(); +} + +Status VHiveTableWriter::close(Status status) { + for (const auto& pair : _partitions_to_writers) { + Status st = pair.second->close(status); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); + continue; + } + } + _partitions_to_writers.clear(); + return Status::OK(); +} + +std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( + vectorized::Block& block, int position) { + auto& hive_table_sink = _t_sink.hive_table_sink; + std::vector<std::string> partition_values; + std::string partition_name; + if (!_partition_columns_input_index.empty()) { + partition_values = _create_partition_values(block, position); + partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + } + const std::vector<THivePartition>& partitions = hive_table_sink.partitions; + const THiveLocationParams& write_location = hive_table_sink.location; + const THivePartition* existing_partition = nullptr; + bool existing_table = true; + for (const auto& partition : partitions) { + if (partition_values == partition.values) { + existing_partition = &partition; + break; + } + } + TUpdateMode::type update_mode; + VHivePartitionWriter::WriteInfo write_info; + TFileFormatType::type file_format_type; + TFileCompressType::type write_compress_type; + if (existing_partition == nullptr) { // new partition + if (existing_table == false) { // new table + update_mode = TUpdateMode::NEW; + if (_partition_columns_input_index.empty()) { // new unpartitioned table + write_info = {write_location.write_path, write_location.target_path, + write_location.file_type}; + } else { // a new partition in a new partitioned table + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), + write_location.file_type}; + } + } else { // a new partition in an existing partitioned table, or an existing unpartitioned table + if (_partition_columns_input_index.empty()) { // an existing unpartitioned table + update_mode = + !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE; + write_info = {write_location.write_path, write_location.target_path, + write_location.file_type}; + } else { // a new partition in an existing partitioned table + update_mode = TUpdateMode::NEW; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), + write_location.file_type}; + } + // need to get schema from existing table ? + } + file_format_type = hive_table_sink.file_format; + write_compress_type = hive_table_sink.compression_type; + } else { // existing partition + if (!hive_table_sink.overwrite) { + update_mode = TUpdateMode::APPEND; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}", existing_partition->location.target_path); + write_info = {std::move(write_path), std::move(target_path), + existing_partition->location.file_type}; + file_format_type = existing_partition->file_format; + write_compress_type = hive_table_sink.compression_type; + } else { + update_mode = TUpdateMode::OVERWRITE; + auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); + auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); + write_info = {std::move(write_path), std::move(target_path), write_location.file_type}; + file_format_type = hive_table_sink.file_format; + write_compress_type = hive_table_sink.compression_type; + // need to get schema from existing table ? + } + } + + return std::make_shared<VHivePartitionWriter>( + _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, + hive_table_sink.columns, std::move(write_info), + fmt::format("{}{}", _compute_file_name(), + _get_file_extension(file_format_type, write_compress_type)), + file_format_type, write_compress_type, hive_table_sink.hadoop_config); +} + +std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block, + int position) { + std::vector<std::string> partition_values; + for (int i = 0; i < _partition_columns_input_index.size(); ++i) { + int partition_column_idx = _partition_columns_input_index[i]; + vectorized::ColumnWithTypeAndName partition_column = + block.get_by_position(partition_column_idx); + std::string value = + _to_partition_value(_vec_output_expr_ctxs[partition_column_idx]->root()->type(), + partition_column, position); + + // Check if value contains only printable ASCII characters + bool isValid = true; + for (char c : value) { + if (c < 0x20 || c > 0x7E) { + isValid = false; + break; + } + } + + if (!isValid) { + // Encode value using Base16 encoding with space separator + std::stringstream encoded; + for (unsigned char c : value) { + encoded << std::hex << std::setw(2) << std::setfill('0') << (int)c; + encoded << " "; + } + throw doris::Exception( + doris::ErrorCode::INTERNAL_ERROR, + "Hive partition values can only contain printable ASCII characters (0x20 - " + "0x7E). Invalid value: {}", + encoded.str()); + } + + partition_values.emplace_back(value); + } + + return partition_values; +} + +std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_desc, Review Comment: warning: function '_to_partition_value' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_desc, ^ ``` <details> <summary>Additional context</summary> **be/src/vec/sink/writer/vhive_table_writer.cpp:281:** 97 lines including whitespace and comments (threshold 80) ```cpp std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor& type_desc, ^ ``` </details> ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { Review Comment: warning: function 'write' has cognitive complexity of 94 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status VHiveTableWriter::write(vectorized::Block& block) { ^ ``` <details> <summary>Additional context</summary> **be/src/vec/sink/writer/vhive_table_writer.cpp:57:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_partition_columns_input_index.empty()) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:59:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (writer_iter == _partitions_to_writers.end()) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:63:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:63:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:64:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:64:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:65:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp } catch (doris::Exception& e) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:69:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:71:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:77:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:77:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:78:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:78:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:79:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp } catch (doris::Exception& e) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:82:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:85:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:85:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->write(block)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:90:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp for (int i = 0; i < block.rows(); ++i) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:94:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp } catch (doris::Exception& e) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:101:** nesting level increased to 2 ```cpp [&](const std::string& partition_name, int position, ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:105:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:105:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(writer->open(_state, _profile)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:111:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp } catch (doris::Exception& e) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:118:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (writer_iter == _partitions_to_writers.end()) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:120:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:120:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:121:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:123:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:127:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); ^ ``` **be/src/common/status.h:541:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:127:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); ^ ``` **be/src/common/status.h:543:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:128:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:132:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (writer_pos_iter == writer_positions.end()) { ^ ``` **be/src/vec/sink/writer/vhive_table_writer.cpp:136:** +1, nesting level increased to 3 ```cpp } else { ^ ``` </details> ########## be/src/vec/sink/writer/vhive_utils.h: ########## @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> Review Comment: warning: 'gen_cpp/DataSinks_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/DataSinks_types.h> ^ ``` ########## be/src/vec/sink/writer/vhive_table_writer.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> + +#include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/writer/async_result_writer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; +struct TypeDescriptor; + +namespace vectorized { + +class Block; +class VHivePartitionWriter; +struct ColumnWithTypeAndName; + +class VHiveTableWriter final : public AsyncResultWriter { +public: + VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + + ~VHiveTableWriter() = default; Review Comment: warning: annotate this function with 'override' or (rarely) 'final' [modernize-use-override] ```suggestion ~VHiveTableWriter() override = default; ``` ########## be/src/vec/sink/writer/vhive_partition_writer.h: ########## @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/PlanNodes_types.h> + +#include "io/fs/file_writer.h" +#include "vec/columns/column.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" + +namespace doris { + +class ObjectPool; +class RuntimeState; +class RuntimeProfile; + +namespace vectorized { + +class Block; +class VFileFormatTransformer; + +class VHivePartitionWriter { +public: + struct WriteInfo { + std::string write_path; + std::string target_path; + TFileType::type file_type; + }; + + VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + const std::vector<THiveColumn>& columns, WriteInfo write_info, + const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf); + + Status init_properties(ObjectPool* pool) { return Status::OK(); } Review Comment: warning: method 'init_properties' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status init_properties(ObjectPool* pool) { return Status::OK(); } ``` ########## be/src/vec/sink/writer/vhive_table_writer.h: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> Review Comment: warning: 'gen_cpp/DataSinks_types.h' file not found [clang-diagnostic-error] ```cpp #include <gen_cpp/DataSinks_types.h> ^ ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { + std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; + + auto& hive_table_sink = _t_sink.hive_table_sink; + + if (_partition_columns_input_index.empty()) { + auto writer_iter = _partitions_to_writers.find(""); + if (writer_iter == _partitions_to_writers.end()) { + try { + std::shared_ptr<VHivePartitionWriter> writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + _partitions_to_writers.erase(writer_iter); + try { + writer = _create_partition_writer(block, -1); + _partitions_to_writers.insert({"", writer}); + RETURN_IF_ERROR(writer->open(_state, _profile)); + RETURN_IF_ERROR(writer->write(block)); + } catch (doris::Exception& e) { + return e.to_status(); + } + } else { + writer = writer_iter->second; + } + RETURN_IF_ERROR(writer->write(block)); + return Status::OK(); + } + } + + for (int i = 0; i < block.rows(); ++i) { + std::vector<std::string> partition_values; + try { + partition_values = _create_partition_values(block, i); + } catch (doris::Exception& e) { + return e.to_status(); + } + std::string partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + + auto create_and_open_writer = + [&](const std::string& partition_name, int position, + std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { + try { + auto writer = _create_partition_writer(block, position); + RETURN_IF_ERROR(writer->open(_state, _profile)); + IColumn::Filter filter(block.rows(), 0); + filter[position] = 1; + writer_positions.insert({writer, std::move(filter)}); + _partitions_to_writers.insert({partition_name, writer}); + writer_ptr = writer; + } catch (doris::Exception& e) { + return e.to_status(); + } + return Status::OK(); + }; + + auto writer_iter = _partitions_to_writers.find(partition_name); + if (writer_iter == _partitions_to_writers.end()) { + std::shared_ptr<VHivePartitionWriter> writer; + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + std::shared_ptr<VHivePartitionWriter> writer; + if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { + static_cast<void>(writer_iter->second->close(Status::OK())); + writer_positions.erase(writer_iter->second); + _partitions_to_writers.erase(writer_iter); + RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer)); + } else { + writer = writer_iter->second; + } + auto writer_pos_iter = writer_positions.find(writer); + if (writer_pos_iter == writer_positions.end()) { + IColumn::Filter filter(block.rows(), 0); + filter[i] = 1; + writer_positions.insert({writer, std::move(filter)}); + } else { + writer_pos_iter->second[i] = 1; + } + } + } + + for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { + RETURN_IF_ERROR(it->first->write(block, &it->second)); + } + return Status::OK(); +} + +Status VHiveTableWriter::close(Status status) { + for (const auto& pair : _partitions_to_writers) { + Status st = pair.second->close(status); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Unsupported type for partition {}", st.to_string()); + continue; + } + } + _partitions_to_writers.clear(); + return Status::OK(); +} + +std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( + vectorized::Block& block, int position) { + auto& hive_table_sink = _t_sink.hive_table_sink; + std::vector<std::string> partition_values; + std::string partition_name; + if (!_partition_columns_input_index.empty()) { + partition_values = _create_partition_values(block, position); + partition_name = VHiveUtils::make_partition_name( + hive_table_sink.columns, _partition_columns_input_index, partition_values); + } + const std::vector<THivePartition>& partitions = hive_table_sink.partitions; + const THiveLocationParams& write_location = hive_table_sink.location; + const THivePartition* existing_partition = nullptr; + bool existing_table = true; + for (const auto& partition : partitions) { + if (partition_values == partition.values) { + existing_partition = &partition; + break; + } + } + TUpdateMode::type update_mode; + VHivePartitionWriter::WriteInfo write_info; + TFileFormatType::type file_format_type; + TFileCompressType::type write_compress_type; + if (existing_partition == nullptr) { // new partition + if (existing_table == false) { // new table Review Comment: warning: redundant boolean literal supplied to boolean operator [readability-simplify-boolean-expr] ```suggestion if (!existing_table) { // new table ``` ########## be/src/vec/sink/writer/vhive_table_writer.cpp: ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_table_writer.h" + +#include "runtime/runtime_state.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/sink/writer/vhive_partition_writer.h" +#include "vec/sink/writer/vhive_utils.h" + +namespace doris { +namespace vectorized { + +VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, + const VExprContextSPtrs& output_expr_ctxs) + : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.hive_table_sink); +} + +Status VHiveTableWriter::init_properties(ObjectPool* pool) { + return Status::OK(); +} + +Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + _profile = profile; + + for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { + if (_t_sink.hive_table_sink.columns[i].column_type == THiveColumnType::PARTITION_KEY) { + _partition_columns_input_index.emplace_back(i); + } + } + return Status::OK(); +} + +Status VHiveTableWriter::write(vectorized::Block& block) { Review Comment: warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status VHiveTableWriter::write(vectorized::Block& block) { ^ ``` <details> <summary>Additional context</summary> **be/src/vec/sink/writer/vhive_table_writer.cpp:52:** 94 lines including whitespace and comments (threshold 80) ```cpp Status VHiveTableWriter::write(vectorized::Block& block) { ^ ``` </details> ########## be/test/util/indexed_priority_queue_test.cpp: ########## @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/indexed_priority_queue.hpp" Review Comment: warning: 'util/indexed_priority_queue.hpp' file not found [clang-diagnostic-error] ```cpp #include "util/indexed_priority_queue.hpp" ^ ``` ########## be/test/vec/exec/skewed_partition_rebalancer_test.cpp: ########## @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <gtest/gtest.h> + +#include <list> + +namespace doris::vectorized { + +class SkewedPartitionRebalancerTest : public testing::Test { +public: + SkewedPartitionRebalancerTest() = default; + virtual ~SkewedPartitionRebalancerTest() = default; + +private: + std::vector<std::list<int>> _get_partition_positions( + std::unique_ptr<SkewedPartitionRebalancer>& rebalancer, + std::vector<long>& partition_row_count, int partition_count, int max_position) { + std::vector<std::list<int>> partitionPositions(rebalancer->get_task_count()); + + for (int partition = 0; partition < rebalancer->get_task_count(); partition++) { + partitionPositions[partition] = std::list<int>(); + } + + for (int position = 0; position < max_position; position++) { + int partition = position % partition_count; + partition = rebalancer->get_task_id(partition, partition_row_count[partition]++); + partitionPositions[partition].push_back(position); + } + + return partitionPositions; + } + + bool _vectors_equal(const std::vector<std::list<int>>& vec1, Review Comment: warning: method '_vectors_equal' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool _vectors_equal(const std::vector<std::list<int>>& vec1, ``` ########## be/src/vec/sink/writer/vhive_utils.h: ########## @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/DataSinks_types.h> + +#include <algorithm> +#include <iostream> +#include <regex> +#include <sstream> +#include <string> +#include <vector> + +namespace doris { +namespace vectorized { Review Comment: warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces] ```suggestion namespace doris::vectorized { ``` be/src/vec/sink/writer/vhive_utils.h:43: ```diff - } // namespace vectorized - } // namespace doris + } // namespace doris ``` ########## be/test/vec/exec/skewed_partition_rebalancer_test.cpp: ########## @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is porting from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java +// to cpp and modified by Doris + +#include "vec/exec/skewed_partition_rebalancer.h" + +#include <gtest/gtest.h> + +#include <list> + +namespace doris::vectorized { + +class SkewedPartitionRebalancerTest : public testing::Test { +public: + SkewedPartitionRebalancerTest() = default; + virtual ~SkewedPartitionRebalancerTest() = default; + +private: + std::vector<std::list<int>> _get_partition_positions( + std::unique_ptr<SkewedPartitionRebalancer>& rebalancer, + std::vector<long>& partition_row_count, int partition_count, int max_position) { + std::vector<std::list<int>> partitionPositions(rebalancer->get_task_count()); + + for (int partition = 0; partition < rebalancer->get_task_count(); partition++) { + partitionPositions[partition] = std::list<int>(); + } + + for (int position = 0; position < max_position; position++) { + int partition = position % partition_count; + partition = rebalancer->get_task_id(partition, partition_row_count[partition]++); + partitionPositions[partition].push_back(position); + } + + return partitionPositions; + } + + bool _vectors_equal(const std::vector<std::list<int>>& vec1, + const std::vector<std::list<int>>& vec2) { + if (vec1.size() != vec2.size()) { + return false; + } + for (size_t i = 0; i < vec1.size(); i++) { + if (vec1[i] != vec2[i]) { + return false; + } + } + return true; + } + + bool _compare_vector_of_lists(const std::vector<std::list<int>>& expected, Review Comment: warning: method '_compare_vector_of_lists' can be made static [readability-convert-member-functions-to-static] ```suggestion static bool _compare_vector_of_lists(const std::vector<std::list<int>>& expected, ``` ########## be/src/vec/sink/writer/vhive_partition_writer.cpp: ########## @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vhive_partition_writer.h" + +#include "io/file_factory.h" +#include "io/fs/file_system.h" +#include "runtime/runtime_state.h" +#include "vec/core/materialize_block.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" + +namespace doris { +namespace vectorized { + +VHivePartitionWriter::VHivePartitionWriter( + const TDataSink& t_sink, const std::string partition_name, TUpdateMode::type update_mode, + const VExprContextSPtrs& output_expr_ctxs, const std::vector<THiveColumn>& columns, + WriteInfo write_info, const std::string file_name, TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map<std::string, std::string>& hadoop_conf) + : _partition_name(std::move(partition_name)), + _update_mode(update_mode), + _vec_output_expr_ctxs(output_expr_ctxs), + _columns(columns), + _write_info(std::move(write_info)), + _file_name(std::move(file_name)), + _file_format_type(file_format_type), + _hive_compress_type(hive_compress_type), + _hadoop_conf(hadoop_conf) + +{} + +Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + std::vector<TNetworkAddress> broker_addresses; + RETURN_IF_ERROR(FileFactory::create_file_writer( + _write_info.file_type, state->exec_env(), broker_addresses, _hadoop_conf, + fmt::format("{}/{}", _write_info.write_path, _file_name), 0, _file_writer_impl)); + + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + bool parquet_disable_dictionary = false; + TParquetCompressionType::type parquet_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + parquet_compression_type = TParquetCompressionType::SNAPPY; + break; + } + case TFileCompressType::ZSTD: { + parquet_compression_type = TParquetCompressionType::ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported hive compress type {} with parquet", + to_string(_hive_compress_type)); + } + } + std::vector<TParquetSchema> parquet_schemas; + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + TParquetSchema parquet_schema; + parquet_schema.schema_column_name = _columns[i].name; + parquet_schemas.emplace_back(std::move(parquet_schema)); + } + _vfile_writer.reset(new VParquetTransformer( + state, _file_writer_impl.get(), _vec_output_expr_ctxs, parquet_schemas, + parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, + false)); + return _vfile_writer->open(); + } + case TFileFormatType::FORMAT_ORC: { + orc::CompressionKind orc_compression_type; + switch (_hive_compress_type) { + case TFileCompressType::PLAIN: { + orc_compression_type = orc::CompressionKind::CompressionKind_NONE; + break; + } + case TFileCompressType::SNAPPYBLOCK: { + orc_compression_type = orc::CompressionKind::CompressionKind_SNAPPY; + break; + } + case TFileCompressType::ZLIB: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + break; + } + case TFileCompressType::ZSTD: { + orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD; + break; + } + default: { + return Status::InternalError("Unsupported type {} with orc", _hive_compress_type); + } + } + orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB; + + std::unique_ptr<orc::Type> root_schema = orc::createStructType(); + for (int i = 0; i < _columns.size(); i++) { + VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root(); + try { + root_schema->addStructField(_columns[i].name, _build_orc_type(column_expr->type())); + } catch (doris::Exception& e) { + return e.to_status(); + } + } + + _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(), + _vec_output_expr_ctxs, std::move(root_schema), + false, orc_compression_type)); + return _vfile_writer->open(); + } + default: { + return Status::InternalError("Unsupported file format type {}", + to_string(_file_format_type)); + } + } +} + +Status VHivePartitionWriter::close(Status status) { + if (_vfile_writer != nullptr) { + Status st = _vfile_writer->close(); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("_vfile_writer close failed, reason: {}", st.to_string()); + } + } + if (status != Status::OK()) { + auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); + Status st = _file_writer_impl->fs()->delete_file(path); + if (st != Status::OK()) { + LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); + } + } + _state->hive_partition_updates().emplace_back(_build_partition_update()); + return Status::OK(); +} + +Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { + Block output_block; + RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); + RETURN_IF_ERROR(_vfile_writer->write(output_block)); + _row_count += output_block.rows(); + _input_size_in_bytes += output_block.bytes(); + return Status::OK(); +} + +std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type( + const TypeDescriptor& type_descriptor) { + std::pair<Status, std::unique_ptr<orc::Type>> result; + switch (type_descriptor.type) { + case TYPE_BOOLEAN: { + return orc::createPrimitiveType(orc::BOOLEAN); + } + case TYPE_TINYINT: { + return orc::createPrimitiveType(orc::BYTE); + } + case TYPE_SMALLINT: { + return orc::createPrimitiveType(orc::SHORT); + } + case TYPE_INT: { + return orc::createPrimitiveType(orc::INT); + } + case TYPE_BIGINT: { + return orc::createPrimitiveType(orc::LONG); + } + case TYPE_FLOAT: { + return orc::createPrimitiveType(orc::FLOAT); + } + case TYPE_DOUBLE: { + return orc::createPrimitiveType(orc::DOUBLE); + } + case TYPE_CHAR: { + return orc::createCharType(orc::CHAR, type_descriptor.len); + } + case TYPE_VARCHAR: { + return orc::createCharType(orc::VARCHAR, type_descriptor.len); + } + case TYPE_STRING: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_BINARY: { + return orc::createPrimitiveType(orc::STRING); + } + case TYPE_DATEV2: { + return orc::createPrimitiveType(orc::DATE); + } + case TYPE_DATETIMEV2: { + return orc::createPrimitiveType(orc::TIMESTAMP); + } + case TYPE_DECIMAL32: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL64: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_DECIMAL128I: { + return orc::createDecimalType(type_descriptor.precision, type_descriptor.scale); + } + case TYPE_STRUCT: { + std::unique_ptr<orc::Type> struct_type = orc::createStructType(); + for (int j = 0; j < type_descriptor.children.size(); ++j) { + struct_type->addStructField(type_descriptor.field_names[j], + _build_orc_type(type_descriptor.children[j])); + } + return struct_type; + } + case TYPE_ARRAY: { + return orc::createListType(_build_orc_type(type_descriptor.children[0])); + } + case TYPE_MAP: { + return orc::createMapType(_build_orc_type(type_descriptor.children[0]), + _build_orc_type(type_descriptor.children[1])); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} to build orc type", + type_descriptor.debug_string()); + } + } +} + +Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, Review Comment: warning: method '_projection_and_filter_block' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
