morningman commented on code in PR #32306:
URL: https://github.com/apache/doris/pull/32306#discussion_r1526306296


##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -492,7 +517,46 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
         RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, 
num_channels,
                                                   channel2rows, 
convert_block.get(), eos));
+    } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+        {
+            SCOPED_TIMER(local_state._split_block_hash_compute_timer);
+            RETURN_IF_ERROR(
+                    local_state._partitioner->do_partitioning(state, block, 
_mem_tracker.get()));
+        }
+        std::vector<std::vector<uint32>> assignments =
+                local_state.scale_writer_partitioning_exchanger->accept(block);
+        RETURN_IF_ERROR(channel_add_rows_with_idx(
+                state, local_state.channels, local_state.channels.size(), 
assignments, block, eos));
 
+    } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
+        // 1. select channel
+        vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+                local_state.channels[local_state.current_channel_idx];
+        if (!current_channel->is_receiver_eof()) {
+            // 2. serialize, send and rollover block
+            if (current_channel->is_local()) {
+                auto status = current_channel->send_local_block(block);
+                HANDLE_CHANNEL_STATUS(state, current_channel, status);
+            } else {
+                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(
+                        block, current_channel->ch_cur_pb_block()));
+                auto status =
+                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                HANDLE_CHANNEL_STATUS(state, current_channel, status);
+                current_channel->ch_roll_pb_block();
+            }
+            _data_processed += block->bytes();
+        }
+

Review Comment:
   Add comment to explain this logic



##########
be/src/vec/sink/writer/vhive_partition_writer.cpp:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_partition_writer.h"
+
+#include "io/file_factory.h"
+#include "io/fs/file_system.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/materialize_block.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
+
+namespace doris {
+namespace vectorized {
+
+VHivePartitionWriter::VHivePartitionWriter(
+        const TDataSink& t_sink, const std::string partition_name, 
TUpdateMode::type update_mode,
+        const VExprContextSPtrs& output_expr_ctxs, const 
std::vector<THiveColumn>& columns,
+        WriteInfo write_info, const std::string file_name, 
TFileFormatType::type file_format_type,

Review Comment:
   ```suggestion
           WriteInfo write_info, const std::string& file_name, 
TFileFormatType::type file_format_type,
   ```



##########
be/src/vec/sink/writer/vhive_table_writer.cpp:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+                                   const VExprContextSPtrs& output_expr_ctxs)
+        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+    DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+    return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+    _state = state;
+    _profile = profile;
+
+    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+        if (_t_sink.hive_table_sink.columns[i].column_type == 
THiveColumnType::PARTITION_KEY) {
+            _partition_columns_input_index.emplace_back(i);
+        }
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> 
writer_positions;
+
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+
+    if (_partition_columns_input_index.empty()) {
+        auto writer_iter = _partitions_to_writers.find("");
+        if (writer_iter == _partitions_to_writers.end()) {
+            try {
+                std::shared_ptr<VHivePartitionWriter> writer = 
_create_partition_writer(block, -1);
+                _partitions_to_writers.insert({"", writer});
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                RETURN_IF_ERROR(writer->write(block));
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                _partitions_to_writers.erase(writer_iter);
+                try {
+                    writer = _create_partition_writer(block, -1);
+                    _partitions_to_writers.insert({"", writer});
+                    RETURN_IF_ERROR(writer->open(_state, _profile));
+                    RETURN_IF_ERROR(writer->write(block));
+                } catch (doris::Exception& e) {
+                    return e.to_status();
+                }
+            } else {
+                writer = writer_iter->second;
+            }
+            RETURN_IF_ERROR(writer->write(block));
+            return Status::OK();
+        }
+    }
+
+    for (int i = 0; i < block.rows(); ++i) {
+        std::vector<std::string> partition_values;
+        try {
+            partition_values = _create_partition_values(block, i);
+        } catch (doris::Exception& e) {
+            return e.to_status();
+        }
+        std::string partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+
+        auto create_and_open_writer =
+                [&](const std::string& partition_name, int position,
+                    std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> 
Status {
+            try {
+                auto writer = _create_partition_writer(block, position);
+                RETURN_IF_ERROR(writer->open(_state, _profile));
+                IColumn::Filter filter(block.rows(), 0);
+                filter[position] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+                _partitions_to_writers.insert({partition_name, writer});
+                writer_ptr = writer;
+            } catch (doris::Exception& e) {
+                return e.to_status();
+            }
+            return Status::OK();
+        };
+
+        auto writer_iter = _partitions_to_writers.find(partition_name);
+        if (writer_iter == _partitions_to_writers.end()) {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+        } else {
+            std::shared_ptr<VHivePartitionWriter> writer;
+            if (writer_iter->second->written_len() > 
config::hive_sink_max_file_size) {
+                static_cast<void>(writer_iter->second->close(Status::OK()));
+                writer_positions.erase(writer_iter->second);
+                _partitions_to_writers.erase(writer_iter);
+                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, 
writer));
+            } else {
+                writer = writer_iter->second;
+            }
+            auto writer_pos_iter = writer_positions.find(writer);
+            if (writer_pos_iter == writer_positions.end()) {
+                IColumn::Filter filter(block.rows(), 0);
+                filter[i] = 1;
+                writer_positions.insert({writer, std::move(filter)});
+            } else {
+                writer_pos_iter->second[i] = 1;
+            }
+        }
+    }
+
+    for (auto it = writer_positions.begin(); it != writer_positions.end(); 
++it) {
+        RETURN_IF_ERROR(it->first->write(block, &it->second));
+    }
+    return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+    for (const auto& pair : _partitions_to_writers) {
+        Status st = pair.second->close(status);
+        if (st != Status::OK()) {
+            LOG(WARNING) << fmt::format("Unsupported type for partition {}", 
st.to_string());
+            continue;
+        }
+    }
+    _partitions_to_writers.clear();
+    return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer(
+        vectorized::Block& block, int position) {
+    auto& hive_table_sink = _t_sink.hive_table_sink;
+    std::vector<std::string> partition_values;
+    std::string partition_name;
+    if (!_partition_columns_input_index.empty()) {
+        partition_values = _create_partition_values(block, position);
+        partition_name = VHiveUtils::make_partition_name(
+                hive_table_sink.columns, _partition_columns_input_index, 
partition_values);
+    }
+    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
+    const THiveLocationParams& write_location = hive_table_sink.location;
+    const THivePartition* existing_partition = nullptr;
+    bool existing_table = true;
+    for (const auto& partition : partitions) {
+        if (partition_values == partition.values) {
+            existing_partition = &partition;
+            break;
+        }
+    }
+    TUpdateMode::type update_mode;
+    VHivePartitionWriter::WriteInfo write_info;
+    TFileFormatType::type file_format_type;
+    TFileCompressType::type write_compress_type;
+    if (existing_partition == nullptr) { // new partition
+        if (existing_table == false) {   // new table
+            update_mode = TUpdateMode::NEW;
+            if (_partition_columns_input_index.empty()) { // new unpartitioned 
table
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in a new partitioned table
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+        } else { // a new partition in an existing partitioned table, or an 
existing unpartitioned table
+            if (_partition_columns_input_index.empty()) { // an existing 
unpartitioned table
+                update_mode =
+                        !hive_table_sink.overwrite ? TUpdateMode::APPEND : 
TUpdateMode::OVERWRITE;
+                write_info = {write_location.write_path, 
write_location.target_path,
+                              write_location.file_type};
+            } else { // a new partition in an existing partitioned table
+                update_mode = TUpdateMode::NEW;
+                auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+                write_info = {std::move(write_path), std::move(target_path),
+                              write_location.file_type};
+            }
+            // need to get schema from existing table ?
+        }
+        file_format_type = hive_table_sink.file_format;
+        write_compress_type = hive_table_sink.compression_type;
+    } else { // existing partition
+        if (!hive_table_sink.overwrite) {
+            update_mode = TUpdateMode::APPEND;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}", 
existing_partition->location.target_path);
+            write_info = {std::move(write_path), std::move(target_path),
+                          existing_partition->location.file_type};
+            file_format_type = existing_partition->file_format;
+            write_compress_type = hive_table_sink.compression_type;
+        } else {
+            update_mode = TUpdateMode::OVERWRITE;
+            auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
+            write_info = {std::move(write_path), std::move(target_path), 
write_location.file_type};
+            file_format_type = hive_table_sink.file_format;
+            write_compress_type = hive_table_sink.compression_type;
+            // need to get schema from existing table ?
+        }
+    }
+
+    return std::make_shared<VHivePartitionWriter>(
+            _t_sink, std::move(partition_name), update_mode, 
_vec_output_expr_ctxs,
+            hive_table_sink.columns, std::move(write_info),
+            fmt::format("{}{}", _compute_file_name(),
+                        _get_file_extension(file_format_type, 
write_compress_type)),
+            file_format_type, write_compress_type, 
hive_table_sink.hadoop_config);
+}
+
+std::vector<std::string> 
VHiveTableWriter::_create_partition_values(vectorized::Block& block,

Review Comment:
   give some example this this convertor



##########
be/src/vec/exec/indexed_priority_queue.hpp:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <functional>
+#include <iostream>
+#include <map>
+#include <optional>
+#include <set>
+
+namespace doris {
+namespace vectorized {
+
+template <typename T>
+struct IndexedPriorityQueueEntry {
+    T value;
+    long priority;
+    long generation;
+
+    IndexedPriorityQueueEntry(T val, long prio, long gen)
+            : value(std::move(val)), priority(prio), generation(gen) {}
+};
+
+enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW };
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering>
+struct IndexedPriorityQueueComparator {
+    bool operator()(const IndexedPriorityQueueEntry<T>& lhs,
+                    const IndexedPriorityQueueEntry<T>& rhs) const {
+        if constexpr (priority_ordering == 
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
+            if (lhs.priority != rhs.priority) {
+                return lhs.priority < rhs.priority;
+            }
+            return lhs.generation < rhs.generation;
+        } else {
+            if (lhs.priority != rhs.priority) {
+                return lhs.priority > rhs.priority;
+            }
+            return lhs.generation < rhs.generation;
+        }
+    }
+};
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering =
+                              
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
+class IndexedPriorityQueue {

Review Comment:
   Add some comment to this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to