This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ef2151ae667 [Feature-WIP](multi-catalog) Add Hive sink on BE side.
(#32306) (#32364)
ef2151ae667 is described below
commit ef2151ae667a923cae8c28d82be50d377430c3e0
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 18 11:23:01 2024 +0800
[Feature-WIP](multi-catalog) Add Hive sink on BE side. (#32306) (#32364)
bp #32306
Co-authored-by: Qi Chen <[email protected]>
---
be/src/common/config.cpp | 15 +
be/src/common/config.h | 13 +
be/src/exec/data_sink.cpp | 16 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 73 +++-
be/src/pipeline/exec/exchange_sink_operator.h | 28 ++
be/src/pipeline/exec/hive_table_sink_operator.cpp | 50 +++
be/src/pipeline/exec/hive_table_sink_operator.h | 115 ++++++
be/src/pipeline/pipeline_fragment_context.cpp | 11 +-
be/src/pipeline/pipeline_x/operator.cpp | 3 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 9 +
be/src/runtime/fragment_mgr.cpp | 18 +
be/src/runtime/runtime_state.h | 4 +
be/src/util/indexed_priority_queue.hpp | 182 +++++++++
be/src/vec/exec/skewed_partition_rebalancer.cpp | 302 ++++++++++++++
be/src/vec/exec/skewed_partition_rebalancer.h | 132 +++++++
be/src/vec/runtime/vorc_transformer.cpp | 23 +-
be/src/vec/runtime/vorc_transformer.h | 6 +-
.../sink/scale_writer_partitioning_exchanger.hpp | 92 +++++
be/src/vec/sink/vhive_table_sink.cpp | 48 +++
be/src/vec/sink/vhive_table_sink.h | 52 +++
be/src/vec/sink/writer/vhive_partition_writer.cpp | 282 ++++++++++++++
be/src/vec/sink/writer/vhive_partition_writer.h | 99 +++++
be/src/vec/sink/writer/vhive_table_writer.cpp | 432 +++++++++++++++++++++
be/src/vec/sink/writer/vhive_table_writer.h | 74 ++++
be/src/vec/sink/writer/vhive_utils.cpp | 78 ++++
be/src/vec/sink/writer/vhive_utils.h | 45 +++
be/test/util/indexed_priority_queue_test.cpp | 104 +++++
.../vec/exec/skewed_partition_rebalancer_test.cpp | 318 +++++++++++++++
be/test/vec/exec/vhive_utils_test.cpp | 70 ++++
29 files changed, 2681 insertions(+), 13 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index dc31d2c621e..960d8f1c19a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1197,6 +1197,21 @@ DEFINE_mString(ca_cert_file_paths,
"/etc/pki/tls/certs/ca-bundle.crt;/etc/ssl/certs/ca-certificates.crt;"
"/etc/ssl/ca-bundle.pem");
+/** Table sink configurations(currently contains only external table types) **/
+// Minimum data processed to scale writers when non partition writing
+DEFINE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold,
+ "125829120"); // 120MB
+// Minimum data processed to start rebalancing in exchange when partition
writing
+DEFINE_mInt64(table_sink_partition_write_data_processed_threshold,
"209715200"); // 200MB
+// Minimum data processed to trigger skewed partition rebalancing in exchange
when partition writing
+DEFINE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold,
+ "209715200"); // 200MB
+// Maximum processed partition nums of per writer when partition writing
+DEFINE_mInt32(table_sink_partition_write_max_partition_nums_per_writer, "128");
+
+/** Hive sink configurations **/
+DEFINE_mInt64(hive_sink_max_file_size, "1073741824"); // 1GB
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index abb833f42e8..85db11b9e61 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1271,6 +1271,19 @@ DECLARE_String(tmp_file_dir);
// the file paths(one or more) of CA cert, splite using ";" aws s3 lib use it
to init s3client
DECLARE_mString(ca_cert_file_paths);
+/** Table sink configurations(currently contains only external table types) **/
+// Minimum data processed to scale writers when non partition writing
+DECLARE_mInt64(table_sink_non_partition_write_scaling_data_processed_threshold);
+// Minimum data processed to start rebalancing in exchange when partition
writing
+DECLARE_mInt64(table_sink_partition_write_data_processed_threshold);
+// Minimum data processed to trigger skewed partition rebalancing in exchange
when partition writing
+DECLARE_mInt64(table_sink_partition_write_skewed_data_processed_rebalance_threshold);
+// Maximum processed partition nums of per writer when partition writing
+DECLARE_mInt32(table_sink_partition_write_max_partition_nums_per_writer);
+
+/** Hive sink configurations **/
+DECLARE_mInt64(hive_sink_max_file_size); // 1GB
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 5047ae8fc78..d373ddd2fcd 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -36,6 +36,7 @@
#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
+#include "vec/sink/vhive_table_sink.h"
#include "vec/sink/vmemory_scratch_sink.h"
#include "vec/sink/volap_table_sink.h"
#include "vec/sink/volap_table_sink_v2.h"
@@ -157,6 +158,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
}
break;
}
+ case TDataSinkType::HIVE_TABLE_SINK: {
+ if (!thrift_sink.__isset.hive_table_sink) {
+ return Status::InternalError("Missing hive table sink.");
+ }
+ sink->reset(new vectorized::VHiveTableSink(pool, row_desc,
output_exprs));
+ break;
+ }
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
@@ -298,6 +306,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
}
break;
}
+ case TDataSinkType::HIVE_TABLE_SINK: {
+ if (!thrift_sink.__isset.hive_table_sink) {
+ return Status::InternalError("Missing hive table sink.");
+ }
+ sink->reset(new vectorized::VHiveTableSink(pool, row_desc,
output_exprs));
+ break;
+ }
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
@@ -313,6 +328,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
+
default: {
std::stringstream error_msg;
std::map<int, const char*>::const_iterator i =
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 333abdbcc01..9c3d68c472f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -157,7 +157,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
local_size++;
}
}
- if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM) {
+ if (_part_type == TPartitionType::UNPARTITIONED || _part_type ==
TPartitionType::RANDOM ||
+ _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
@@ -249,6 +250,27 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
.schema = _schema,
.caller = (void*)this,
.create_partition_callback =
&ExchangeSinkLocalState::empty_callback_function});
+ } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ _partition_count =
+ channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
+ _partitioner.reset(
+ new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_partition_count));
+ _partition_function.reset(new
HashPartitionFunction(_partitioner.get()));
+ // const long MEGABYTE = 1024 * 1024;
+ // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD
= 10000 * MEGABYTE; // 1MB
+ // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50000 *
MEGABYTE; // 50MB
+
+ // const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD
= 1; // 1MB
+ // const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 1;
// 50MB
+ scale_writer_partitioning_exchanger.reset(new
vectorized::ScaleWriterPartitioningExchanger<
+ HashPartitionFunction>(
+ channels.size(), *_partition_function, _partition_count,
channels.size(), 1,
+ config::table_sink_partition_write_data_processed_threshold,
+
config::table_sink_partition_write_skewed_data_processed_rebalance_threshold));
+ RETURN_IF_ERROR(_partitioner->init(p._texprs));
+ RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
+ _profile->add_info_string("Partitioner",
+ fmt::format("Crc32HashPartitioner({})",
_partition_count));
}
_finish_dependency->block();
@@ -259,7 +281,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
if (_part_type == TPartitionType::HASH_PARTITIONED ||
- _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+ _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->open(state));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
@@ -320,7 +343,9 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::RANDOM ||
sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
sink.output_partition.type ==
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
- sink.output_partition.type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
+ sink.output_partition.type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
+ sink.output_partition.type ==
TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
+ sink.output_partition.type ==
TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
}
@@ -492,7 +517,47 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels,
num_channels,
channel2rows,
convert_block.get(), eos));
+ } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
+ {
+ SCOPED_TIMER(local_state._split_block_hash_compute_timer);
+ RETURN_IF_ERROR(
+ local_state._partitioner->do_partitioning(state, block,
_mem_tracker.get()));
+ }
+ std::vector<std::vector<uint32>> assignments =
+ local_state.scale_writer_partitioning_exchanger->accept(block);
+ RETURN_IF_ERROR(channel_add_rows_with_idx(
+ state, local_state.channels, local_state.channels.size(),
assignments, block, eos));
+ } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
+ // Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
+ // 1. select channel
+ vectorized::PipChannel<ExchangeSinkLocalState>* current_channel =
+ local_state.channels[local_state.current_channel_idx];
+ if (!current_channel->is_receiver_eof()) {
+ // 2. serialize, send and rollover block
+ if (current_channel->is_local()) {
+ auto status = current_channel->send_local_block(block);
+ HANDLE_CHANNEL_STATUS(state, current_channel, status);
+ } else {
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ RETURN_IF_ERROR(local_state._serializer.serialize_block(
+ block, current_channel->ch_cur_pb_block()));
+ auto status =
+
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+ HANDLE_CHANNEL_STATUS(state, current_channel, status);
+ current_channel->ch_roll_pb_block();
+ }
+ _data_processed += block->bytes();
+ }
+
+ if (_writer_count < local_state.channels.size()) {
+ if (_data_processed >=
+ _writer_count *
+
config::table_sink_non_partition_write_scaling_data_processed_threshold) {
+ _writer_count++;
+ }
+ }
+ local_state.current_channel_idx = (local_state.current_channel_idx +
1) % _writer_count;
} else {
// Range partition
// 1. calculate range
@@ -581,7 +646,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
channel2rows[i].clear();
}
}
-
if (eos) {
for (int i = 0; i < num_channels; ++i) {
if (!channels[i]->is_receiver_eof()) {
@@ -590,7 +654,6 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
}
}
}
-
return Status::OK();
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 9384adbe5f3..17878fc6ead 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -25,6 +25,7 @@
#include "exchange_sink_buffer.h"
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
#include "vec/sink/vdata_stream_sender.h"
namespace doris {
@@ -68,6 +69,21 @@ class ExchangeSinkLocalState final : public
PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<>;
+private:
+ class HashPartitionFunction {
+ public:
+ HashPartitionFunction(vectorized::PartitionerBase* partitioner)
+ : _partitioner(partitioner) {}
+
+ int get_partition(vectorized::Block* block, int position) {
+ uint32_t* partition_ids =
(uint32_t*)_partitioner->get_channel_ids();
+ return partition_ids[position];
+ }
+
+ private:
+ vectorized::PartitionerBase* _partitioner;
+ };
+
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state),
@@ -132,6 +148,10 @@ public:
int current_channel_idx; // index of current channel to send to if _random
== true
bool only_local_exchange;
+ // for external table sink hash partition
+
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
+ scale_writer_partitioning_exchanger;
+
private:
friend class ExchangeSinkOperatorX;
friend class vectorized::Channel<ExchangeSinkLocalState>;
@@ -209,6 +229,9 @@ private:
std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
int64_t _number_input_rows = 0;
TPartitionType::type _part_type;
+
+ // for external table sink hash partition
+ std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
@@ -274,6 +297,11 @@ private:
const TTupleId& _tablet_sink_tuple_id;
int64_t _tablet_sink_txn_id = -1;
std::shared_ptr<ObjectPool> _pool;
+
+ // for external table sink random partition
+ // Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
+ size_t _data_processed = 0;
+ int _writer_count = 1;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.cpp
b/be/src/pipeline/exec/hive_table_sink_operator.cpp
new file mode 100644
index 00000000000..6b8eaa8c91e
--- /dev/null
+++ b/be/src/pipeline/exec/hive_table_sink_operator.cpp
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hive_table_sink_operator.h"
+
+#include "common/status.h"
+
+namespace doris::pipeline {
+
+OperatorPtr HiveTableSinkOperatorBuilder::build_operator() {
+ return std::make_shared<HiveTableSinkOperator>(this, _sink);
+}
+
+Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<Parent>();
+ RETURN_IF_ERROR(_writer->init_properties(p._pool));
+ return Status::OK();
+}
+
+Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status)
{
+ if (Base::_closed) {
+ return Status::OK();
+ }
+ SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(exec_time_counter());
+ if (_closed) {
+ return _close_status;
+ }
+ _close_status = Base::close(state, exec_status);
+ return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h
b/be/src/pipeline/exec/hive_table_sink_operator.h
new file mode 100644
index 00000000000..39b5df36567
--- /dev/null
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
+#include "vec/sink/vhive_table_sink.h"
+
+namespace doris {
+
+namespace pipeline {
+
+class HiveTableSinkOperatorBuilder final
+ : public DataSinkOperatorBuilder<vectorized::VHiveTableSink> {
+public:
+ HiveTableSinkOperatorBuilder(int32_t id, DataSink* sink)
+ : DataSinkOperatorBuilder(id, "HiveTableSinkOperator", sink) {}
+
+ OperatorPtr build_operator() override;
+};
+
+class HiveTableSinkOperator final : public
DataSinkOperator<vectorized::VHiveTableSink> {
+public:
+ HiveTableSinkOperator(OperatorBuilderBase* operator_builder, DataSink*
sink)
+ : DataSinkOperator(operator_builder, sink) {}
+
+ bool can_write() override { return _sink->can_write(); }
+};
+
+class HiveTableSinkOperatorX;
+
+class HiveTableSinkLocalState final
+ : public AsyncWriterSink<vectorized::VHiveTableWriter,
HiveTableSinkOperatorX> {
+public:
+ using Base = AsyncWriterSink<vectorized::VHiveTableWriter,
HiveTableSinkOperatorX>;
+ using Parent = HiveTableSinkOperatorX;
+ ENABLE_FACTORY_CREATOR(HiveTableSinkLocalState);
+ HiveTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
+ : Base(parent, state) {};
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ return Base::open(state);
+ }
+
+ Status close(RuntimeState* state, Status exec_status) override;
+ friend class HiveTableSinkOperatorX;
+
+private:
+ Status _close_status = Status::OK();
+};
+
+class HiveTableSinkOperatorX final : public
DataSinkOperatorX<HiveTableSinkLocalState> {
+public:
+ using Base = DataSinkOperatorX<HiveTableSinkLocalState>;
+ HiveTableSinkOperatorX(ObjectPool* pool, int operator_id, const
RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_output_expr)
+ : Base(operator_id, 0),
+ _row_desc(row_desc),
+ _t_output_expr(t_output_expr),
+ _pool(pool) {};
+
+ Status init(const TDataSink& thrift_sink) override {
+ RETURN_IF_ERROR(Base::init(thrift_sink));
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
+ return Status::OK();
+ }
+
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
+ return vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc);
+ }
+
+ Status open(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::open(state));
+ return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+ }
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ return local_state.sink(state, in_block, eos);
+ }
+
+private:
+ friend class HiveTableSinkLocalState;
+ template <typename Writer, typename Parent>
+ requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
+ friend class AsyncWriterSink;
+ const RowDescriptor& _row_desc;
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+ const std::vector<TExpr>& _t_output_expr;
+ ObjectPool* _pool = nullptr;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d1800299559..c0e075d2bfa 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -59,6 +59,7 @@
#include "pipeline/exec/group_commit_block_sink_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep
@@ -821,12 +822,14 @@ Status PipelineFragmentContext::_create_sink(int
sender_id, const TDataSink& thr
_sink.get());
break;
}
- case TDataSinkType::MYSQL_TABLE_SINK:
- case TDataSinkType::JDBC_TABLE_SINK:
- case TDataSinkType::ODBC_TABLE_SINK: {
- sink_ =
std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
+ case TDataSinkType::HIVE_TABLE_SINK: {
+ sink_ =
std::make_shared<HiveTableSinkOperatorBuilder>(next_operator_builder_id(),
+ _sink.get());
break;
}
+ case TDataSinkType::MYSQL_TABLE_SINK:
+ case TDataSinkType::JDBC_TABLE_SINK:
+ case TDataSinkType::ODBC_TABLE_SINK:
case TDataSinkType::RESULT_FILE_SINK: {
sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(
thrift_sink.result_file_sink.dest_node_id, _sink.get());
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 0c890b83041..f9a30c7e3ac 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -37,6 +37,7 @@
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
@@ -541,6 +542,7 @@ DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
+DECLARE_OPERATOR_X(HiveTableSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(SpillSortSinkLocalState)
@@ -637,5 +639,6 @@ template class
AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileS
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriter,
OlapTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2,
OlapTableSinkV2OperatorX>;
+template class AsyncWriterSink<doris::vectorized::VHiveTableWriter,
HiveTableSinkOperatorX>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 58fc3cb843b..13ad0789e31 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -51,6 +51,7 @@
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
+#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
@@ -372,6 +373,14 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}
break;
}
+ case TDataSinkType::HIVE_TABLE_SINK: {
+ if (!thrift_sink.__isset.hive_table_sink) {
+ return Status::InternalError("Missing hive table sink.");
+ }
+ _sink.reset(
+ new HiveTableSinkOperatorX(pool, next_sink_operator_id(),
row_desc, output_exprs));
+ break;
+ }
case TDataSinkType::JDBC_TABLE_SINK: {
if (!thrift_sink.__isset.jdbc_table_sink) {
return Status::InternalError("Missing data jdbc sink.");
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f6552f67fca..4657c341e81 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -373,6 +373,24 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
}
+ if (!req.runtime_state->hive_partition_updates().empty()) {
+ params.__isset.hive_partition_updates = true;
+ params.hive_partition_updates.reserve(
+ req.runtime_state->hive_partition_updates().size());
+ for (auto& hive_partition_update :
req.runtime_state->hive_partition_updates()) {
+ params.hive_partition_updates.push_back(hive_partition_update);
+ }
+ } else if (!req.runtime_states.empty()) {
+ for (auto* rs : req.runtime_states) {
+ if (!rs->hive_partition_updates().empty()) {
+ params.__isset.hive_partition_updates = true;
+
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
+
rs->hive_partition_updates().begin(),
+
rs->hive_partition_updates().end());
+ }
+ }
+ }
+
// Send new errors to coordinator
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (params.error_log.size() > 0);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e14721d4859..d031f36fbed 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -452,6 +452,8 @@ public:
std::vector<TTabletCommitInfo>& tablet_commit_infos() { return
_tablet_commit_infos; }
+ std::vector<THivePartitionUpdate>& hive_partition_updates() { return
_hive_partition_updates; }
+
const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return
_error_tablet_infos; }
std::vector<TErrorTabletInfo>& error_tablet_infos() { return
_error_tablet_infos; }
@@ -726,6 +728,8 @@ private:
int _max_operator_id = 0;
int _task_id = -1;
+ std::vector<THivePartitionUpdate> _hive_partition_updates;
+
std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>>
_op_id_to_local_state;
std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase>
_sink_local_state;
diff --git a/be/src/util/indexed_priority_queue.hpp
b/be/src/util/indexed_priority_queue.hpp
new file mode 100644
index 00000000000..5869d575231
--- /dev/null
+++ b/be/src/util/indexed_priority_queue.hpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// to cpp and modified by Doris
+
+#pragma once
+
+#pragma once
+
+#include <functional>
+#include <iostream>
+#include <map>
+#include <optional>
+#include <set>
+
+/**
+ * A priority queue with constant time contains(E) and log time remove(E)
+ * Ties are broken by insertion order.
+ * LOW_TO_HIGH is the priority order from low to high,
+ * HIGH_TO_LOW is the priority order from high to low.
+ * Those with the same priority are arranged in order of insertion.
+ */
+
+namespace doris {
+
+template <typename T>
+struct IndexedPriorityQueueEntry {
+ T value;
+ long priority;
+ long generation;
+
+ IndexedPriorityQueueEntry(T val, long prio, long gen)
+ : value(std::move(val)), priority(prio), generation(gen) {}
+};
+
+enum class IndexedPriorityQueuePriorityOrdering { LOW_TO_HIGH, HIGH_TO_LOW };
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering>
+struct IndexedPriorityQueueComparator {
+ bool operator()(const IndexedPriorityQueueEntry<T>& lhs,
+ const IndexedPriorityQueueEntry<T>& rhs) const {
+ if constexpr (priority_ordering ==
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH) {
+ if (lhs.priority != rhs.priority) {
+ return lhs.priority < rhs.priority;
+ }
+ return lhs.generation < rhs.generation;
+ } else {
+ if (lhs.priority != rhs.priority) {
+ return lhs.priority > rhs.priority;
+ }
+ return lhs.generation < rhs.generation;
+ }
+ }
+};
+
+template <typename T, IndexedPriorityQueuePriorityOrdering priority_ordering =
+
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
+class IndexedPriorityQueue {
+public:
+ struct Prioritized {
+ T value;
+ long priority;
+ };
+
+ IndexedPriorityQueue() = default;
+
+ bool add_or_update(T element, long priority) {
+ auto it = _index.find(element);
+ if (it != _index.end()) {
+ if (it->second.priority == priority) {
+ return false;
+ }
+ _queue.erase(it->second);
+ }
+ IndexedPriorityQueueEntry<T> entry {std::move(element), priority,
generation++};
+ _queue.insert(std::move(entry));
+ _index.insert({entry.value, std::move(entry)});
+ return true;
+ }
+
+ bool contains(const T& element) const { return _index.find(element) !=
_index.end(); }
+
+ bool remove(const T& element) {
+ auto it = _index.find(element);
+ if (it != _index.end()) {
+ _queue.erase(it->second);
+ _index.erase(it);
+ return true;
+ }
+ return false;
+ }
+
+ std::optional<T> poll() {
+ if (_queue.empty()) {
+ return std::nullopt;
+ }
+ T value = _queue.begin()->value;
+ _index.erase(value);
+ _queue.erase(_queue.begin());
+ return value;
+ }
+
+ std::optional<Prioritized> peek() const {
+ if (_queue.empty()) {
+ return std::nullopt;
+ }
+ const IndexedPriorityQueueEntry<T>& entry = *_queue.begin();
+ return Prioritized {entry.value, entry.priority};
+ }
+
+ int size() const { return _queue.size(); }
+
+ bool is_empty() const { return _queue.empty(); }
+
+ class Iterator {
+ public:
+ using iterator_category = std::forward_iterator_tag;
+ using value_type = T;
+ using difference_type = std::ptrdiff_t;
+ using pointer = T*;
+ using reference = T&;
+
+ Iterator() : _iter() {}
+ explicit Iterator(
+ typename std::set<
+ IndexedPriorityQueueEntry<T>,
+ IndexedPriorityQueueComparator<T,
priority_ordering>>::const_iterator iter)
+ : _iter(iter) {}
+
+ const T& operator*() const { return _iter->value; }
+
+ const T* operator->() const { return &(_iter->value); }
+
+ Iterator& operator++() {
+ ++_iter;
+ return *this;
+ }
+
+ Iterator operator++(int) {
+ Iterator tmp = *this;
+ ++(*this);
+ return tmp;
+ }
+
+ bool operator==(const Iterator& other) const { return _iter ==
other._iter; }
+
+ bool operator!=(const Iterator& other) const { return !(*this ==
other); }
+
+ private:
+ typename std::set<IndexedPriorityQueueEntry<T>,
+ IndexedPriorityQueueComparator<T,
priority_ordering>>::const_iterator
+ _iter;
+ };
+
+ Iterator begin() const { return Iterator(_queue.begin()); }
+
+ Iterator end() const { return Iterator(_queue.end()); }
+
+private:
+ std::map<T, IndexedPriorityQueueEntry<T>> _index;
+ std::set<IndexedPriorityQueueEntry<T>, IndexedPriorityQueueComparator<T,
priority_ordering>>
+ _queue;
+
+ long generation = 0;
+};
+
+} // namespace doris
diff --git a/be/src/vec/exec/skewed_partition_rebalancer.cpp
b/be/src/vec/exec/skewed_partition_rebalancer.cpp
new file mode 100644
index 00000000000..ae12d365f05
--- /dev/null
+++ b/be/src/vec/exec/skewed_partition_rebalancer.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <cmath>
+#include <list>
+
+namespace doris::vectorized {
+
+SkewedPartitionRebalancer::SkewedPartitionRebalancer(
+ int partition_count, int task_count, int task_bucket_count,
+ long min_partition_data_processed_rebalance_threshold,
+ long min_data_processed_rebalance_threshold)
+ : _partition_count(partition_count),
+ _task_count(task_count),
+ _task_bucket_count(task_bucket_count),
+ _min_partition_data_processed_rebalance_threshold(
+ min_partition_data_processed_rebalance_threshold),
+ _min_data_processed_rebalance_threshold(
+ std::max(min_partition_data_processed_rebalance_threshold,
+ min_data_processed_rebalance_threshold)),
+ _partition_row_count(partition_count, 0),
+ _data_processed(0),
+ _data_processed_at_last_rebalance(0),
+ _partition_data_size(partition_count, 0),
+ _partition_data_size_at_last_rebalance(partition_count, 0),
+ _partition_data_size_since_last_rebalance_per_task(partition_count,
0),
+ _estimated_task_bucket_data_size_since_last_rebalance(task_count *
task_bucket_count, 0),
+ _partition_assignments(partition_count) {
+ std::vector<int> task_bucket_ids(task_count, 0);
+
+ for (int partition = 0; partition < partition_count; partition++) {
+ int task_id = partition % task_count;
+ int bucket_id = task_bucket_ids[task_id]++ % task_bucket_count;
+ TaskBucket task_bucket(task_id, bucket_id, task_bucket_count);
+ _partition_assignments[partition].emplace_back(std::move(task_bucket));
+ }
+}
+
+std::vector<std::list<int>>
SkewedPartitionRebalancer::get_partition_assignments() {
+ std::vector<std::list<int>> assigned_tasks;
+
+ for (const auto& partition_assignment : _partition_assignments) {
+ std::list<int> tasks;
+ std::transform(partition_assignment.begin(),
partition_assignment.end(),
+ std::back_inserter(tasks),
+ [](const TaskBucket& task_bucket) { return
task_bucket.task_id; });
+ assigned_tasks.push_back(tasks);
+ }
+
+ return assigned_tasks;
+}
+
+int SkewedPartitionRebalancer::get_task_count() {
+ return _task_count;
+}
+
+int SkewedPartitionRebalancer::get_task_id(int partition_id, int64_t index) {
+ const std::vector<TaskBucket>& task_ids =
_partition_assignments[partition_id];
+
+ int task_id_index = (index % task_ids.size() + task_ids.size()) %
task_ids.size();
+
+ return task_ids[task_id_index].task_id;
+}
+
+void SkewedPartitionRebalancer::add_data_processed(long data_size) {
+ _data_processed += data_size;
+}
+
+void SkewedPartitionRebalancer::add_partition_row_count(int partition, long
row_count) {
+ _partition_row_count[partition] += row_count;
+}
+
+void SkewedPartitionRebalancer::rebalance() {
+ long current_data_processed = _data_processed;
+ if (_should_rebalance(current_data_processed)) {
+ _rebalance_partitions(current_data_processed);
+ }
+}
+
+void SkewedPartitionRebalancer::_calculate_partition_data_size(long
data_processed) {
+ long total_partition_row_count = 0;
+ for (int partition = 0; partition < _partition_count; partition++) {
+ total_partition_row_count += _partition_row_count[partition];
+ }
+
+ for (int partition = 0; partition < _partition_count; partition++) {
+ _partition_data_size[partition] = std::max(
+ (_partition_row_count[partition] * data_processed) /
total_partition_row_count,
+ _partition_data_size[partition]);
+ }
+}
+
+long
SkewedPartitionRebalancer::_calculate_task_bucket_data_size_since_last_rebalance(
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_partitions) {
+ long estimated_data_size_since_last_rebalance = 0;
+ for (auto& elem : max_partitions) {
+ estimated_data_size_since_last_rebalance +=
+ _partition_data_size_since_last_rebalance_per_task[elem];
+ }
+ return estimated_data_size_since_last_rebalance;
+}
+
+void SkewedPartitionRebalancer::_rebalance_based_on_task_bucket_skewness(
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_task_buckets,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets,
+ std::vector<IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
+ task_bucket_max_partitions) {
+ std::vector<int> scaled_partitions;
+ while (true) {
+ std::optional<TaskBucket> max_task_bucket = max_task_buckets.poll();
+ if (!max_task_bucket.has_value()) {
+ break;
+ }
+
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_partitions =
task_bucket_max_partitions[max_task_bucket->id];
+ if (max_partitions.is_empty()) {
+ continue;
+ }
+
+ std::vector<TaskBucket> min_skewed_task_buckets =
+ _find_skewed_min_task_buckets(max_task_bucket.value(),
min_task_buckets);
+ if (min_skewed_task_buckets.empty()) {
+ break;
+ }
+
+ while (true) {
+ std::optional<int> max_partition = max_partitions.poll();
+ if (!max_partition.has_value()) {
+ break;
+ }
+ int max_partition_value = max_partition.value();
+
+ if (std::find(scaled_partitions.begin(), scaled_partitions.end(),
+ max_partition_value) != scaled_partitions.end()) {
+ continue;
+ }
+
+ int total_assigned_tasks =
_partition_assignments[max_partition_value].size();
+ if (_partition_data_size[max_partition_value] >=
+ (_min_partition_data_processed_rebalance_threshold *
total_assigned_tasks)) {
+ for (const TaskBucket& min_task_bucket :
min_skewed_task_buckets) {
+ if (_rebalance_partition(max_partition_value,
min_task_bucket, max_task_buckets,
+ min_task_buckets)) {
+ scaled_partitions.push_back(max_partition_value);
+ break;
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ }
+}
+
+std::vector<SkewedPartitionRebalancer::TaskBucket>
+SkewedPartitionRebalancer::_find_skewed_min_task_buckets(
+ const TaskBucket& max_task_bucket,
+ const IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets) {
+ std::vector<TaskBucket> min_skewed_task_buckets;
+
+ for (const auto& min_task_bucket : min_task_buckets) {
+ double skewness =
+ static_cast<double>(
+
_estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id] -
+
_estimated_task_bucket_data_size_since_last_rebalance[min_task_bucket.id]) /
+
_estimated_task_bucket_data_size_since_last_rebalance[max_task_bucket.id];
+ if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD ||
std::isnan(skewness)) {
+ break;
+ }
+ if (max_task_bucket.task_id != min_task_bucket.task_id) {
+ min_skewed_task_buckets.push_back(min_task_bucket);
+ }
+ }
+ return min_skewed_task_buckets;
+}
+
+bool SkewedPartitionRebalancer::_rebalance_partition(
+ int partition_id, const TaskBucket& to_task_bucket,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_task_buckets,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets) {
+ std::vector<TaskBucket>& assignments =
_partition_assignments[partition_id];
+ if (std::any_of(assignments.begin(), assignments.end(),
+ [&to_task_bucket](const TaskBucket& task_bucket) {
+ return task_bucket.task_id == to_task_bucket.task_id;
+ })) {
+ return false;
+ }
+
+ assignments.push_back(to_task_bucket);
+
+ int new_task_count = assignments.size();
+ int old_task_count = new_task_count - 1;
+ for (const TaskBucket& task_bucket : assignments) {
+ if (task_bucket == to_task_bucket) {
+
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] +=
+
(_partition_data_size_since_last_rebalance_per_task[partition_id] *
+ old_task_count) /
+ new_task_count;
+ } else {
+
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id] -=
+
_partition_data_size_since_last_rebalance_per_task[partition_id] /
+ new_task_count;
+ }
+ max_task_buckets.add_or_update(
+ task_bucket,
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
+ min_task_buckets.add_or_update(
+ task_bucket,
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket.id]);
+ }
+
+ return true;
+}
+
+bool SkewedPartitionRebalancer::_should_rebalance(long data_processed) {
+ return (data_processed - _data_processed_at_last_rebalance) >=
+ _min_data_processed_rebalance_threshold;
+}
+
+void SkewedPartitionRebalancer::_rebalance_partitions(long data_processed) {
+ if (!_should_rebalance(data_processed)) {
+ return;
+ }
+
+ _calculate_partition_data_size(data_processed);
+
+ for (int partition = 0; partition < _partition_count; partition++) {
+ int total_assigned_tasks = _partition_assignments[partition].size();
+ long data_size = _partition_data_size[partition];
+ _partition_data_size_since_last_rebalance_per_task[partition] =
+ (data_size -
_partition_data_size_at_last_rebalance[partition]) /
+ total_assigned_tasks;
+ _partition_data_size_at_last_rebalance[partition] = data_size;
+ }
+
+ std::vector<IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>
+ task_bucket_max_partitions;
+
+ for (int i = 0; i < _task_count * _task_bucket_count; ++i) {
+ task_bucket_max_partitions.push_back(
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>());
+ }
+
+ for (int partition = 0; partition < _partition_count; partition++) {
+ auto& taskAssignments = _partition_assignments[partition];
+ for (const auto& taskBucket : taskAssignments) {
+ auto& queue = task_bucket_max_partitions[taskBucket.id];
+ queue.add_or_update(partition,
+
_partition_data_size_since_last_rebalance_per_task[partition]);
+ }
+ }
+
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>
+ max_task_buckets;
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>
+ min_task_buckets;
+
+ for (int taskId = 0; taskId < _task_count; taskId++) {
+ for (int bucketId = 0; bucketId < _task_bucket_count; bucketId++) {
+ TaskBucket task_bucket1(taskId, bucketId, _task_bucket_count);
+ TaskBucket task_bucket2(taskId, bucketId, _task_bucket_count);
+
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id] =
+ _calculate_task_bucket_data_size_since_last_rebalance(
+ task_bucket_max_partitions[task_bucket1.id]);
+ max_task_buckets.add_or_update(
+ std::move(task_bucket1),
+
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket1.id]);
+ min_task_buckets.add_or_update(
+ std::move(task_bucket2),
+
_estimated_task_bucket_data_size_since_last_rebalance[task_bucket2.id]);
+ }
+ }
+
+ _rebalance_based_on_task_bucket_skewness(max_task_buckets,
min_task_buckets,
+ task_bucket_max_partitions);
+ _data_processed_at_last_rebalance = data_processed;
+}
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/skewed_partition_rebalancer.h
b/be/src/vec/exec/skewed_partition_rebalancer.h
new file mode 100644
index 00000000000..814ebc1d465
--- /dev/null
+++ b/be/src/vec/exec/skewed_partition_rebalancer.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+/**
+ * Helps in distributing big or skewed partitions across available tasks to
improve the performance of
+ * partitioned writes.
+ * <p>
+ * This rebalancer initialize a bunch of buckets for each task based on a
given taskBucketCount and then tries to
+ * uniformly distribute partitions across those buckets. This helps to
mitigate two problems:
+ * 1. Mitigate skewness across tasks.
+ * 2. Scale few big partitions across tasks even if there's no skewness among
them. This will essentially speed the
+ * local scaling without impacting much overall resource utilization.
+ * <p>
+ * Example:
+ * <p>
+ * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions
+ * Task1 Task2 Task3
+ * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1
+ * Bucket2 Bucket2 Bucket2
+ * Bucket3 Bucket3 Bucket3
+ * <p>
+ * After rebalancing:
+ * Task1 Task2 Task3
+ * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1)
+ * Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2)
+ * Bucket3 Bucket3 Bucket3
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <iostream>
+#include <list>
+#include <optional>
+#include <vector>
+
+#include "util/indexed_priority_queue.hpp"
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancer {
+private:
+ struct TaskBucket {
+ int task_id;
+ int id;
+
+ TaskBucket(int task_id_, int bucket_id_, int task_bucket_count_)
+ : task_id(task_id_), id(task_id_ * task_bucket_count_ +
bucket_id_) {}
+
+ bool operator==(const TaskBucket& other) const { return id ==
other.id; }
+
+ bool operator<(const TaskBucket& other) const { return id < other.id; }
+
+ bool operator>(const TaskBucket& other) const { return id > other.id; }
+ };
+
+public:
+ SkewedPartitionRebalancer(int partition_count, int task_count, int
task_bucket_count,
+ long
min_partition_data_processed_rebalance_threshold,
+ long min_data_processed_rebalance_threshold);
+
+ std::vector<std::list<int>> get_partition_assignments();
+ int get_task_count();
+ int get_task_id(int partition_id, int64_t index);
+ void add_data_processed(long data_size);
+ void add_partition_row_count(int partition, long row_count);
+ void rebalance();
+
+private:
+ void _calculate_partition_data_size(long data_processed);
+ long _calculate_task_bucket_data_size_since_last_rebalance(
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_partitions);
+ void _rebalance_based_on_task_bucket_skewness(
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_task_buckets,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets,
+ std::vector<
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>>&
+ task_bucket_max_partitions);
+ std::vector<TaskBucket> _find_skewed_min_task_buckets(
+ const TaskBucket& max_task_bucket,
+ const IndexedPriorityQueue<TaskBucket,
+
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets);
+ bool _rebalance_partition(
+ int partition_id, const TaskBucket& to_task_bucket,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW>&
+ max_task_buckets,
+ IndexedPriorityQueue<TaskBucket,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH>&
+ min_task_buckets);
+
+ bool _should_rebalance(long data_processed);
+ void _rebalance_partitions(long data_processed);
+
+private:
+ static constexpr double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7;
+
+ int _partition_count;
+ int _task_count;
+ int _task_bucket_count;
+ long _min_partition_data_processed_rebalance_threshold;
+ long _min_data_processed_rebalance_threshold;
+ std::vector<long> _partition_row_count;
+ long _data_processed;
+ long _data_processed_at_last_rebalance;
+ std::vector<long> _partition_data_size;
+ std::vector<long> _partition_data_size_at_last_rebalance;
+ std::vector<long> _partition_data_size_since_last_rebalance_per_task;
+ std::vector<long> _estimated_task_bucket_data_size_since_last_rebalance;
+
+ std::vector<std::vector<TaskBucket>> _partition_assignments;
+};
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/runtime/vorc_transformer.cpp
b/be/src/vec/runtime/vorc_transformer.cpp
index be52b672aa8..764a97ae5bc 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -99,16 +99,33 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state,
doris::io::FileWriter* fil
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_file_writer(file_writer),
_write_options(new orc::WriterOptions()),
- _schema_str(schema) {
+ _schema_str(&schema),
+ _schema(nullptr) {
_write_options->setTimezoneName(_state->timezone());
_write_options->setUseTightNumericVector(true);
}
+VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
+ std::unique_ptr<orc::Type> schema, bool
output_object_data,
+ orc::CompressionKind compression)
+ : VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+ _file_writer(file_writer),
+ _write_options(new orc::WriterOptions()),
+ _schema_str(nullptr),
+ _schema(std::move(schema)) {
+ _write_options->setTimezoneName(_state->timezone());
+ _write_options->setUseTightNumericVector(true);
+ _write_options->setCompression(compression);
+}
+
Status VOrcTransformer::open() {
try {
- _schema = orc::Type::buildTypeFromString(_schema_str);
+ if (_schema == nullptr && _schema_str != nullptr) {
+ _schema = orc::Type::buildTypeFromString(*_schema_str);
+ }
} catch (const std::exception& e) {
- return Status::InternalError("Orc build schema from \"{}\" failed:
{}", _schema_str,
+ return Status::InternalError("Orc build schema from \"{}\" failed:
{}", *_schema_str,
e.what());
}
_output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
diff --git a/be/src/vec/runtime/vorc_transformer.h
b/be/src/vec/runtime/vorc_transformer.h
index 4b8ea178ca9..8cfc956c0cd 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -78,6 +78,10 @@ public:
const VExprContextSPtrs& output_vexpr_ctxs, const
std::string& schema,
bool output_object_data);
+ VOrcTransformer(RuntimeState* state, doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs,
std::unique_ptr<orc::Type> schema,
+ bool output_object_data, orc::CompressionKind compression);
+
~VOrcTransformer() = default;
Status open() override;
@@ -99,7 +103,7 @@ private:
doris::io::FileWriter* _file_writer = nullptr;
std::unique_ptr<orc::OutputStream> _output_stream;
std::unique_ptr<orc::WriterOptions> _write_options;
- const std::string& _schema_str;
+ const std::string* _schema_str;
std::unique_ptr<orc::Type> _schema;
std::unique_ptr<orc::Writer> _writer;
diff --git a/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
new file mode 100644
index 00000000000..f7435249c20
--- /dev/null
+++ b/be/src/vec/sink/scale_writer_partitioning_exchanger.hpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <functional>
+#include <iostream>
+#include <vector>
+
+#include "vec/core/block.h"
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+namespace doris::vectorized {
+
+template <typename PartitionFunction>
+class ScaleWriterPartitioningExchanger {
+public:
+ ScaleWriterPartitioningExchanger(int channel_size, PartitionFunction&
partition_function,
+ int partition_count, int task_count, int
task_bucket_count,
+ long
min_partition_data_processed_rebalance_threshold,
+ long
min_data_processed_rebalance_threshold)
+ : _channel_size(channel_size),
+ _partition_function(partition_function),
+ _partition_rebalancer(partition_count, task_count,
task_bucket_count,
+
min_partition_data_processed_rebalance_threshold,
+ min_data_processed_rebalance_threshold),
+ _partition_row_counts(partition_count, 0),
+ _partition_writer_ids(partition_count, -1),
+ _partition_writer_indexes(partition_count, 0) {}
+
+ std::vector<std::vector<uint32_t>> accept(Block* block) {
+ std::vector<std::vector<uint32_t>> writerAssignments(_channel_size,
+
std::vector<uint32_t>());
+ for (int partition_id = 0; partition_id <
_partition_row_counts.size(); partition_id++) {
+ _partition_row_counts[partition_id] = 0;
+ _partition_writer_ids[partition_id] = -1;
+ }
+
+ _partition_rebalancer.rebalance();
+
+ for (int position = 0; position < block->rows(); position++) {
+ int partition_id = _partition_function.get_partition(block,
position);
+ _partition_row_counts[partition_id] += 1;
+
+ // Get writer id for this partition by looking at the scaling state
+ int writer_id = _partition_writer_ids[partition_id];
+ if (writer_id == -1) {
+ writer_id = get_next_writer_id(partition_id);
+ _partition_writer_ids[partition_id] = writer_id;
+ }
+ writerAssignments[writer_id].push_back(position);
+ }
+
+ for (int partition_id = 0; partition_id <
_partition_row_counts.size(); partition_id++) {
+ _partition_rebalancer.add_partition_row_count(partition_id,
+
_partition_row_counts[partition_id]);
+ }
+ _partition_rebalancer.add_data_processed(block->bytes());
+
+ return writerAssignments;
+ }
+
+ int get_next_writer_id(int partition_id) {
+ return _partition_rebalancer.get_task_id(partition_id,
+
_partition_writer_indexes[partition_id]++);
+ }
+
+private:
+ int _channel_size;
+ PartitionFunction& _partition_function;
+ SkewedPartitionRebalancer _partition_rebalancer;
+ std::vector<int> _partition_row_counts;
+ std::vector<int> _partition_writer_ids;
+ std::vector<int> _partition_writer_indexes;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/vhive_table_sink.cpp
b/be/src/vec/sink/vhive_table_sink.cpp
new file mode 100644
index 00000000000..0fba50cef69
--- /dev/null
+++ b/be/src/vec/sink/vhive_table_sink.cpp
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vhive_table_sink.h"
+
+namespace doris {
+class TExpr;
+
+namespace vectorized {
+
+VHiveTableSink::VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& texprs)
+ : AsyncWriterSink<VHiveTableWriter, VHIVE_TABLE_SINK>(row_desc,
texprs), _pool(pool) {}
+
+VHiveTableSink::~VHiveTableSink() = default;
+
+Status VHiveTableSink::init(const TDataSink& t_sink) {
+ RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
+ RETURN_IF_ERROR(_writer->init_properties(_pool));
+ return Status::OK();
+}
+
+Status VHiveTableSink::close(RuntimeState* state, Status exec_status) {
+ SCOPED_TIMER(_exec_timer);
+ if (_closed) {
+ return _close_status;
+ }
+ RETURN_IF_ERROR(DataSink::close(state, exec_status));
+ _close_status = AsyncWriterSink::close(state, exec_status);
+ return _close_status;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vhive_table_sink.h
b/be/src/vec/sink/vhive_table_sink.h
new file mode 100644
index 00000000000..d7b9c3fc856
--- /dev/null
+++ b/be/src/vec/sink/vhive_table_sink.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "vec/sink/async_writer_sink.h"
+#include "vec/sink/writer/vhive_table_writer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RowDescriptor;
+
+namespace vectorized {
+
+inline constexpr char VHIVE_TABLE_SINK[] = "VHiveTableSink";
+
+class VHiveTableSink final : public AsyncWriterSink<VHiveTableWriter,
VHIVE_TABLE_SINK> {
+public:
+ // Construct from thrift struct which is generated by FE.
+ VHiveTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& texprs);
+
+ ~VHiveTableSink() override;
+
+ Status init(const TDataSink& sink) override;
+
+ Status close(RuntimeState* state, Status exec_status) override;
+
+private:
+ ObjectPool* _pool = nullptr;
+
+ Status _close_status = Status::OK();
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
new file mode 100644
index 00000000000..38668abb3fc
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_partition_writer.h"
+
+#include "io/file_factory.h"
+#include "io/fs/file_system.h"
+#include "runtime/runtime_state.h"
+#include "vec/core/materialize_block.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
+
+namespace doris {
+namespace vectorized {
+
+VHivePartitionWriter::VHivePartitionWriter(
+ const TDataSink& t_sink, const std::string partition_name,
TUpdateMode::type update_mode,
+ const VExprContextSPtrs& output_expr_ctxs, const
std::vector<THiveColumn>& columns,
+ WriteInfo write_info, const std::string file_name,
TFileFormatType::type file_format_type,
+ TFileCompressType::type hive_compress_type,
+ const std::map<std::string, std::string>& hadoop_conf)
+ : _partition_name(std::move(partition_name)),
+ _update_mode(update_mode),
+ _vec_output_expr_ctxs(output_expr_ctxs),
+ _columns(columns),
+ _write_info(std::move(write_info)),
+ _file_name(std::move(file_name)),
+ _file_format_type(file_format_type),
+ _hive_compress_type(hive_compress_type),
+ _hadoop_conf(hadoop_conf)
+
+{}
+
+Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile*
profile) {
+ _state = state;
+
+ std::vector<TNetworkAddress> broker_addresses;
+ RETURN_IF_ERROR(FileFactory::create_file_writer(
+ _write_info.file_type, state->exec_env(), broker_addresses,
_hadoop_conf,
+ fmt::format("{}/{}", _write_info.write_path, _file_name), 0,
_file_writer_impl));
+
+ switch (_file_format_type) {
+ case TFileFormatType::FORMAT_PARQUET: {
+ bool parquet_disable_dictionary = false;
+ TParquetCompressionType::type parquet_compression_type;
+ switch (_hive_compress_type) {
+ case TFileCompressType::PLAIN: {
+ parquet_compression_type = TParquetCompressionType::UNCOMPRESSED;
+ break;
+ }
+ case TFileCompressType::SNAPPYBLOCK: {
+ parquet_compression_type = TParquetCompressionType::SNAPPY;
+ break;
+ }
+ case TFileCompressType::ZSTD: {
+ parquet_compression_type = TParquetCompressionType::ZSTD;
+ break;
+ }
+ default: {
+ return Status::InternalError("Unsupported hive compress type {}
with parquet",
+ to_string(_hive_compress_type));
+ }
+ }
+ std::vector<TParquetSchema> parquet_schemas;
+ for (int i = 0; i < _columns.size(); i++) {
+ VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+ TParquetSchema parquet_schema;
+ parquet_schema.schema_column_name = _columns[i].name;
+ parquet_schemas.emplace_back(std::move(parquet_schema));
+ }
+ _vfile_writer.reset(new VParquetTransformer(
+ state, _file_writer_impl.get(), _vec_output_expr_ctxs,
parquet_schemas,
+ parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
+ false));
+ return _vfile_writer->open();
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ orc::CompressionKind orc_compression_type;
+ switch (_hive_compress_type) {
+ case TFileCompressType::PLAIN: {
+ orc_compression_type = orc::CompressionKind::CompressionKind_NONE;
+ break;
+ }
+ case TFileCompressType::SNAPPYBLOCK: {
+ orc_compression_type =
orc::CompressionKind::CompressionKind_SNAPPY;
+ break;
+ }
+ case TFileCompressType::ZLIB: {
+ orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+ break;
+ }
+ case TFileCompressType::ZSTD: {
+ orc_compression_type = orc::CompressionKind::CompressionKind_ZSTD;
+ break;
+ }
+ default: {
+ return Status::InternalError("Unsupported type {} with orc",
_hive_compress_type);
+ }
+ }
+ orc_compression_type = orc::CompressionKind::CompressionKind_ZLIB;
+
+ std::unique_ptr<orc::Type> root_schema = orc::createStructType();
+ for (int i = 0; i < _columns.size(); i++) {
+ VExprSPtr column_expr = _vec_output_expr_ctxs[i]->root();
+ try {
+ root_schema->addStructField(_columns[i].name,
_build_orc_type(column_expr->type()));
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ }
+
+ _vfile_writer.reset(new VOrcTransformer(state, _file_writer_impl.get(),
+ _vec_output_expr_ctxs,
std::move(root_schema),
+ false, orc_compression_type));
+ return _vfile_writer->open();
+ }
+ default: {
+ return Status::InternalError("Unsupported file format type {}",
+ to_string(_file_format_type));
+ }
+ }
+}
+
+Status VHivePartitionWriter::close(Status status) {
+ if (_vfile_writer != nullptr) {
+ Status st = _vfile_writer->close();
+ if (st != Status::OK()) {
+ LOG(WARNING) << fmt::format("_vfile_writer close failed, reason:
{}", st.to_string());
+ }
+ }
+ if (status != Status::OK()) {
+ auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
+ Status st = _file_writer_impl->fs()->delete_file(path);
+ if (st != Status::OK()) {
+ LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}",
path, st.to_string());
+ }
+ }
+ _state->hive_partition_updates().emplace_back(_build_partition_update());
+ return Status::OK();
+}
+
+Status VHivePartitionWriter::write(vectorized::Block& block,
vectorized::IColumn::Filter* filter) {
+ Block output_block;
+ RETURN_IF_ERROR(_projection_and_filter_block(block, filter,
&output_block));
+ RETURN_IF_ERROR(_vfile_writer->write(output_block));
+ _row_count += output_block.rows();
+ _input_size_in_bytes += output_block.bytes();
+ return Status::OK();
+}
+
+std::unique_ptr<orc::Type> VHivePartitionWriter::_build_orc_type(
+ const TypeDescriptor& type_descriptor) {
+ std::pair<Status, std::unique_ptr<orc::Type>> result;
+ switch (type_descriptor.type) {
+ case TYPE_BOOLEAN: {
+ return orc::createPrimitiveType(orc::BOOLEAN);
+ }
+ case TYPE_TINYINT: {
+ return orc::createPrimitiveType(orc::BYTE);
+ }
+ case TYPE_SMALLINT: {
+ return orc::createPrimitiveType(orc::SHORT);
+ }
+ case TYPE_INT: {
+ return orc::createPrimitiveType(orc::INT);
+ }
+ case TYPE_BIGINT: {
+ return orc::createPrimitiveType(orc::LONG);
+ }
+ case TYPE_FLOAT: {
+ return orc::createPrimitiveType(orc::FLOAT);
+ }
+ case TYPE_DOUBLE: {
+ return orc::createPrimitiveType(orc::DOUBLE);
+ }
+ case TYPE_CHAR: {
+ return orc::createCharType(orc::CHAR, type_descriptor.len);
+ }
+ case TYPE_VARCHAR: {
+ return orc::createCharType(orc::VARCHAR, type_descriptor.len);
+ }
+ case TYPE_STRING: {
+ return orc::createPrimitiveType(orc::STRING);
+ }
+ case TYPE_BINARY: {
+ return orc::createPrimitiveType(orc::STRING);
+ }
+ case TYPE_DATEV2: {
+ return orc::createPrimitiveType(orc::DATE);
+ }
+ case TYPE_DATETIMEV2: {
+ return orc::createPrimitiveType(orc::TIMESTAMP);
+ }
+ case TYPE_DECIMAL32: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_DECIMAL64: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_DECIMAL128I: {
+ return orc::createDecimalType(type_descriptor.precision,
type_descriptor.scale);
+ }
+ case TYPE_STRUCT: {
+ std::unique_ptr<orc::Type> struct_type = orc::createStructType();
+ for (int j = 0; j < type_descriptor.children.size(); ++j) {
+ struct_type->addStructField(type_descriptor.field_names[j],
+
_build_orc_type(type_descriptor.children[j]));
+ }
+ return struct_type;
+ }
+ case TYPE_ARRAY: {
+ return
orc::createListType(_build_orc_type(type_descriptor.children[0]));
+ }
+ case TYPE_MAP: {
+ return orc::createMapType(_build_orc_type(type_descriptor.children[0]),
+
_build_orc_type(type_descriptor.children[1]));
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type {} to build orc type",
+ type_descriptor.debug_string());
+ }
+ }
+}
+
+Status
VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block&
input_block,
+ const
vectorized::IColumn::Filter* filter,
+
doris::vectorized::Block* output_block) {
+ Status status = Status::OK();
+ if (input_block.rows() == 0) {
+ return status;
+ }
+
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
+ _vec_output_expr_ctxs, input_block, output_block));
+ materialize_block_inplace(*output_block);
+
+ if (filter == nullptr) {
+ return status;
+ }
+
+ std::vector<uint32_t> columns_to_filter;
+ int column_to_keep = input_block.columns();
+ columns_to_filter.resize(column_to_keep);
+ for (uint32_t i = 0; i < column_to_keep; ++i) {
+ columns_to_filter[i] = i;
+ }
+
+ Block::filter_block_internal(output_block, columns_to_filter, *filter);
+
+ return status;
+}
+
+THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
+ THivePartitionUpdate hive_partition_update;
+ hive_partition_update.__set_name(_partition_name);
+ hive_partition_update.__set_update_mode(_update_mode);
+ THiveLocationParams location;
+ location.__set_write_path(_write_info.write_path);
+ location.__set_target_path(_write_info.target_path);
+ hive_partition_update.__set_location(location);
+ hive_partition_update.__set_file_names({_file_name});
+ hive_partition_update.__set_row_count(_row_count);
+ hive_partition_update.__set_file_size(_input_size_in_bytes);
+ return hive_partition_update;
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h
b/be/src/vec/sink/writer/vhive_partition_writer.h
new file mode 100644
index 00000000000..afee81aa499
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include "io/fs/file_writer.h"
+#include "vec/columns/column.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+class Block;
+class VFileFormatTransformer;
+
+class VHivePartitionWriter {
+public:
+ struct WriteInfo {
+ std::string write_path;
+ std::string target_path;
+ TFileType::type file_type;
+ };
+
+ VHivePartitionWriter(const TDataSink& t_sink, const std::string
partition_name,
+ TUpdateMode::type update_mode, const
VExprContextSPtrs& output_expr_ctxs,
+ const std::vector<THiveColumn>& columns, WriteInfo
write_info,
+ const std::string file_name, TFileFormatType::type
file_format_type,
+ TFileCompressType::type hive_compress_type,
+ const std::map<std::string, std::string>&
hadoop_conf);
+
+ Status init_properties(ObjectPool* pool) { return Status::OK(); }
+
+ Status open(RuntimeState* state, RuntimeProfile* profile);
+
+ Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr);
+
+ Status close(Status);
+
+ inline size_t written_len() { return _vfile_writer->written_len(); }
+
+private:
+ std::unique_ptr<orc::Type> _build_orc_type(const TypeDescriptor&
type_descriptor);
+
+ Status _projection_and_filter_block(doris::vectorized::Block& input_block,
+ const vectorized::IColumn::Filter*
filter,
+ doris::vectorized::Block*
output_block);
+
+ THivePartitionUpdate _build_partition_update();
+
+ std::string _path;
+
+ std::string _partition_name;
+
+ TUpdateMode::type _update_mode;
+
+ size_t _row_count = 0;
+ size_t _input_size_in_bytes = 0;
+
+ const VExprContextSPtrs& _vec_output_expr_ctxs;
+
+ const std::vector<THiveColumn>& _columns;
+ WriteInfo _write_info;
+ std::string _file_name;
+ TFileFormatType::type _file_format_type;
+ TFileCompressType::type _hive_compress_type;
+ const std::map<std::string, std::string>& _hadoop_conf;
+
+ // If the result file format is plain text, like CSV, this _file_writer is
owned by this FileResultWriter.
+ // If the result file format is Parquet, this _file_writer is owned by
_parquet_writer.
+ std::unique_ptr<doris::io::FileWriter> _file_writer_impl = nullptr;
+ // convert block to parquet/orc/csv format
+ std::unique_ptr<VFileFormatTransformer> _vfile_writer = nullptr;
+
+ RuntimeState* _state;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
new file mode 100644
index 00000000000..1c123c92b3a
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_table_writer.h"
+
+#include "runtime/runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/writer/vhive_partition_writer.h"
+#include "vec/sink/writer/vhive_utils.h"
+
+namespace doris {
+namespace vectorized {
+
+VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
+ const VExprContextSPtrs& output_expr_ctxs)
+ : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+ DCHECK(_t_sink.__isset.hive_table_sink);
+}
+
+Status VHiveTableWriter::init_properties(ObjectPool* pool) {
+ return Status::OK();
+}
+
+Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
+ _state = state;
+ _profile = profile;
+
+ for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
+ if (_t_sink.hive_table_sink.columns[i].column_type ==
THiveColumnType::PARTITION_KEY) {
+ _partition_columns_input_index.emplace_back(i);
+ }
+ }
+ return Status::OK();
+}
+
+Status VHiveTableWriter::write(vectorized::Block& block) {
+ std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter>
writer_positions;
+
+ auto& hive_table_sink = _t_sink.hive_table_sink;
+
+ if (_partition_columns_input_index.empty()) {
+ auto writer_iter = _partitions_to_writers.find("");
+ if (writer_iter == _partitions_to_writers.end()) {
+ try {
+ std::shared_ptr<VHivePartitionWriter> writer =
_create_partition_writer(block, -1);
+ _partitions_to_writers.insert({"", writer});
+ RETURN_IF_ERROR(writer->open(_state, _profile));
+ RETURN_IF_ERROR(writer->write(block));
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ return Status::OK();
+ } else {
+ std::shared_ptr<VHivePartitionWriter> writer;
+ if (writer_iter->second->written_len() >
config::hive_sink_max_file_size) {
+ static_cast<void>(writer_iter->second->close(Status::OK()));
+ _partitions_to_writers.erase(writer_iter);
+ try {
+ writer = _create_partition_writer(block, -1);
+ _partitions_to_writers.insert({"", writer});
+ RETURN_IF_ERROR(writer->open(_state, _profile));
+ RETURN_IF_ERROR(writer->write(block));
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ } else {
+ writer = writer_iter->second;
+ }
+ RETURN_IF_ERROR(writer->write(block));
+ return Status::OK();
+ }
+ }
+
+ for (int i = 0; i < block.rows(); ++i) {
+ std::vector<std::string> partition_values;
+ try {
+ partition_values = _create_partition_values(block, i);
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ std::string partition_name = VHiveUtils::make_partition_name(
+ hive_table_sink.columns, _partition_columns_input_index,
partition_values);
+
+ auto create_and_open_writer =
+ [&](const std::string& partition_name, int position,
+ std::shared_ptr<VHivePartitionWriter>& writer_ptr) ->
Status {
+ try {
+ auto writer = _create_partition_writer(block, position);
+ RETURN_IF_ERROR(writer->open(_state, _profile));
+ IColumn::Filter filter(block.rows(), 0);
+ filter[position] = 1;
+ writer_positions.insert({writer, std::move(filter)});
+ _partitions_to_writers.insert({partition_name, writer});
+ writer_ptr = writer;
+ } catch (doris::Exception& e) {
+ return e.to_status();
+ }
+ return Status::OK();
+ };
+
+ auto writer_iter = _partitions_to_writers.find(partition_name);
+ if (writer_iter == _partitions_to_writers.end()) {
+ std::shared_ptr<VHivePartitionWriter> writer;
+ RETURN_IF_ERROR(create_and_open_writer(partition_name, i, writer));
+ } else {
+ std::shared_ptr<VHivePartitionWriter> writer;
+ if (writer_iter->second->written_len() >
config::hive_sink_max_file_size) {
+ static_cast<void>(writer_iter->second->close(Status::OK()));
+ writer_positions.erase(writer_iter->second);
+ _partitions_to_writers.erase(writer_iter);
+ RETURN_IF_ERROR(create_and_open_writer(partition_name, i,
writer));
+ } else {
+ writer = writer_iter->second;
+ }
+ auto writer_pos_iter = writer_positions.find(writer);
+ if (writer_pos_iter == writer_positions.end()) {
+ IColumn::Filter filter(block.rows(), 0);
+ filter[i] = 1;
+ writer_positions.insert({writer, std::move(filter)});
+ } else {
+ writer_pos_iter->second[i] = 1;
+ }
+ }
+ }
+
+ for (auto it = writer_positions.begin(); it != writer_positions.end();
++it) {
+ RETURN_IF_ERROR(it->first->write(block, &it->second));
+ }
+ return Status::OK();
+}
+
+Status VHiveTableWriter::close(Status status) {
+ for (const auto& pair : _partitions_to_writers) {
+ Status st = pair.second->close(status);
+ if (st != Status::OK()) {
+ LOG(WARNING) << fmt::format("Unsupported type for partition {}",
st.to_string());
+ continue;
+ }
+ }
+ _partitions_to_writers.clear();
+ return Status::OK();
+}
+
+std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer(
+ vectorized::Block& block, int position) {
+ auto& hive_table_sink = _t_sink.hive_table_sink;
+ std::vector<std::string> partition_values;
+ std::string partition_name;
+ if (!_partition_columns_input_index.empty()) {
+ partition_values = _create_partition_values(block, position);
+ partition_name = VHiveUtils::make_partition_name(
+ hive_table_sink.columns, _partition_columns_input_index,
partition_values);
+ }
+ const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
+ const THiveLocationParams& write_location = hive_table_sink.location;
+ const THivePartition* existing_partition = nullptr;
+ bool existing_table = true;
+ for (const auto& partition : partitions) {
+ if (partition_values == partition.values) {
+ existing_partition = &partition;
+ break;
+ }
+ }
+ TUpdateMode::type update_mode;
+ VHivePartitionWriter::WriteInfo write_info;
+ TFileFormatType::type file_format_type;
+ TFileCompressType::type write_compress_type;
+ if (existing_partition == nullptr) { // new partition
+ if (existing_table == false) { // new table
+ update_mode = TUpdateMode::NEW;
+ if (_partition_columns_input_index.empty()) { // new unpartitioned
table
+ write_info = {write_location.write_path,
write_location.target_path,
+ write_location.file_type};
+ } else { // a new partition in a new partitioned table
+ auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
+ auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
+ write_info = {std::move(write_path), std::move(target_path),
+ write_location.file_type};
+ }
+ } else { // a new partition in an existing partitioned table, or an
existing unpartitioned table
+ if (_partition_columns_input_index.empty()) { // an existing
unpartitioned table
+ update_mode =
+ !hive_table_sink.overwrite ? TUpdateMode::APPEND :
TUpdateMode::OVERWRITE;
+ write_info = {write_location.write_path,
write_location.target_path,
+ write_location.file_type};
+ } else { // a new partition in an existing partitioned table
+ update_mode = TUpdateMode::NEW;
+ auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
+ auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
+ write_info = {std::move(write_path), std::move(target_path),
+ write_location.file_type};
+ }
+ // need to get schema from existing table ?
+ }
+ file_format_type = hive_table_sink.file_format;
+ write_compress_type = hive_table_sink.compression_type;
+ } else { // existing partition
+ if (!hive_table_sink.overwrite) {
+ update_mode = TUpdateMode::APPEND;
+ auto write_path = fmt::format("{}/{}", write_location.write_path,
partition_name);
+ auto target_path = fmt::format("{}",
existing_partition->location.target_path);
+ write_info = {std::move(write_path), std::move(target_path),
+ existing_partition->location.file_type};
+ file_format_type = existing_partition->file_format;
+ write_compress_type = hive_table_sink.compression_type;
+ } else {
+ update_mode = TUpdateMode::OVERWRITE;
+ auto write_path = fmt::format("{}/{}", write_location.write_path,
partition_name);
+ auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
+ write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
+ file_format_type = hive_table_sink.file_format;
+ write_compress_type = hive_table_sink.compression_type;
+ // need to get schema from existing table ?
+ }
+ }
+
+ return std::make_shared<VHivePartitionWriter>(
+ _t_sink, std::move(partition_name), update_mode,
_vec_output_expr_ctxs,
+ hive_table_sink.columns, std::move(write_info),
+ fmt::format("{}{}", _compute_file_name(),
+ _get_file_extension(file_format_type,
write_compress_type)),
+ file_format_type, write_compress_type,
hive_table_sink.hadoop_config);
+}
+
+std::vector<std::string>
VHiveTableWriter::_create_partition_values(vectorized::Block& block,
+ int
position) {
+ std::vector<std::string> partition_values;
+ for (int i = 0; i < _partition_columns_input_index.size(); ++i) {
+ int partition_column_idx = _partition_columns_input_index[i];
+ vectorized::ColumnWithTypeAndName partition_column =
+ block.get_by_position(partition_column_idx);
+ std::string value =
+
_to_partition_value(_vec_output_expr_ctxs[partition_column_idx]->root()->type(),
+ partition_column, position);
+
+ // Check if value contains only printable ASCII characters
+ bool isValid = true;
+ for (char c : value) {
+ if (c < 0x20 || c > 0x7E) {
+ isValid = false;
+ break;
+ }
+ }
+
+ if (!isValid) {
+ // Encode value using Base16 encoding with space separator
+ std::stringstream encoded;
+ for (unsigned char c : value) {
+ encoded << std::hex << std::setw(2) << std::setfill('0') <<
(int)c;
+ encoded << " ";
+ }
+ throw doris::Exception(
+ doris::ErrorCode::INTERNAL_ERROR,
+ "Hive partition values can only contain printable ASCII
characters (0x20 - "
+ "0x7E). Invalid value: {}",
+ encoded.str());
+ }
+
+ partition_values.emplace_back(value);
+ }
+
+ return partition_values;
+}
+
+std::string VHiveTableWriter::_to_partition_value(const TypeDescriptor&
type_desc,
+ const ColumnWithTypeAndName&
partition_column,
+ int position) {
+ ColumnPtr column;
+ if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+ auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+ if (null_map_data[position]) {
+ return "__HIVE_DEFAULT_PARTITION__";
+ }
+ column = nullable_column->get_nested_column_ptr();
+ } else {
+ column = partition_column.column;
+ }
+ auto [item, size] = column->get_data_at(position);
+ switch (type_desc.type) {
+ case TYPE_BOOLEAN: {
+ vectorized::Field field =
+ vectorized::check_and_get_column<const
ColumnUInt8>(*column)->operator[](position);
+ return std::to_string(field.get<bool>());
+ }
+ case TYPE_TINYINT: {
+ return std::to_string(*reinterpret_cast<const Int8*>(item));
+ }
+ case TYPE_SMALLINT: {
+ return std::to_string(*reinterpret_cast<const Int16*>(item));
+ }
+ case TYPE_INT: {
+ return std::to_string(*reinterpret_cast<const Int32*>(item));
+ }
+ case TYPE_BIGINT: {
+ return std::to_string(*reinterpret_cast<const Int64*>(item));
+ }
+ case TYPE_FLOAT: {
+ return std::to_string(*reinterpret_cast<const Float32*>(item));
+ }
+ case TYPE_DOUBLE: {
+ return std::to_string(*reinterpret_cast<const Float64*>(item));
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ return std::string(item, size);
+ }
+ case TYPE_DATE: {
+ VecDateTimeValue value = binary_cast<int64_t,
doris::VecDateTimeValue>(*(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ return std::string(buf, pos - buf - 1);
+ }
+ case TYPE_DATETIME: {
+ VecDateTimeValue value = binary_cast<int64_t,
doris::VecDateTimeValue>(*(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ return std::string(buf, pos - buf - 1);
+ break;
+ }
+ case TYPE_DATEV2: {
+ DateV2Value<DateV2ValueType> value =
+ binary_cast<uint32_t,
DateV2Value<DateV2ValueType>>(*(int32_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ return std::string(buf, pos - buf - 1);
+ }
+ case TYPE_DATETIMEV2: {
+ DateV2Value<DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf, type_desc.scale);
+ return std::string(buf, pos - buf - 1);
+ }
+ case TYPE_DECIMALV2: {
+ Decimal128V2 value = *(Decimal128V2*)(item);
+ return value.to_string(type_desc.scale);
+ }
+ case TYPE_DECIMAL32: {
+ Decimal32 value = *(Decimal32*)(item);
+ return value.to_string(type_desc.scale);
+ }
+ case TYPE_DECIMAL64: {
+ Decimal64 value = *(Decimal64*)(item);
+ return value.to_string(type_desc.scale);
+ }
+ case TYPE_DECIMAL128I: {
+ Decimal128V3 value = *(Decimal128V3*)(item);
+ return value.to_string(type_desc.scale);
+ }
+ case TYPE_DECIMAL256: {
+ Decimal256 value = *(Decimal256*)(item);
+ return value.to_string(type_desc.scale);
+ }
+ default: {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+ "Unsupported type for partition {}",
type_desc.debug_string());
+ }
+ }
+}
+
+std::string VHiveTableWriter::_get_file_extension(TFileFormatType::type
file_format_type,
+ TFileCompressType::type
write_compress_type) {
+ std::string compress_name;
+ switch (write_compress_type) {
+ case TFileCompressType::SNAPPYBLOCK: {
+ compress_name = ".snappy";
+ break;
+ }
+ case TFileCompressType::ZLIB: {
+ compress_name = ".zlib";
+ break;
+ }
+ case TFileCompressType::ZSTD: {
+ compress_name = ".zstd";
+ break;
+ }
+ default: {
+ compress_name = "";
+ break;
+ }
+ }
+
+ std::string file_format_name;
+ switch (file_format_type) {
+ case TFileFormatType::FORMAT_PARQUET: {
+ file_format_name = ".parquet";
+ break;
+ }
+ case TFileFormatType::FORMAT_ORC: {
+ file_format_name = ".orc";
+ break;
+ }
+ default: {
+ file_format_name = "";
+ break;
+ }
+ }
+ return fmt::format("{}{}", compress_name, file_format_name);
+}
+
+std::string VHiveTableWriter::_compute_file_name() {
+ boost::uuids::uuid uuid = boost::uuids::random_generator()();
+
+ std::string uuid_str = boost::uuids::to_string(uuid);
+
+ return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str);
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h
b/be/src/vec/sink/writer/vhive_table_writer.h
new file mode 100644
index 00000000000..a4681b32e3f
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/writer/async_result_writer.h"
+
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+struct TypeDescriptor;
+
+namespace vectorized {
+
+class Block;
+class VHivePartitionWriter;
+struct ColumnWithTypeAndName;
+
+class VHiveTableWriter final : public AsyncResultWriter {
+public:
+ VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
+
+ ~VHiveTableWriter() = default;
+
+ Status init_properties(ObjectPool* pool);
+
+ Status open(RuntimeState* state, RuntimeProfile* profile) override;
+
+ Status write(vectorized::Block& block) override;
+
+ Status close(Status) override;
+
+private:
+ std::shared_ptr<VHivePartitionWriter>
_create_partition_writer(vectorized::Block& block,
+ int
position);
+
+ std::vector<std::string> _create_partition_values(vectorized::Block&
block, int position);
+
+ std::string _to_partition_value(const TypeDescriptor& type_desc,
+ const ColumnWithTypeAndName&
partition_column, int position);
+
+ std::string _get_file_extension(TFileFormatType::type file_format_type,
+ TFileCompressType::type
write_compress_type);
+
+ std::string _compute_file_name();
+
+ // Currently it is a copy, maybe it is better to use move semantics to
eliminate it.
+ TDataSink _t_sink;
+ RuntimeState* _state = nullptr;
+ RuntimeProfile* _profile = nullptr;
+ std::vector<int> _partition_columns_input_index;
+ std::unordered_map<std::string, std::shared_ptr<VHivePartitionWriter>>
_partitions_to_writers;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_utils.cpp
b/be/src/vec/sink/writer/vhive_utils.cpp
new file mode 100644
index 00000000000..9a97b893775
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_utils.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vhive_utils.h"
+
+#include <algorithm>
+#include <regex>
+#include <sstream>
+
+namespace doris {
+namespace vectorized {
+
+const std::regex
VHiveUtils::PATH_CHAR_TO_ESCAPE("[\\x00-\\x1F\"#%'*/:=?\\\\\\x7F\\{\\[\\]\\^]");
+
+std::string VHiveUtils::make_partition_name(const std::vector<THiveColumn>&
columns,
+ const std::vector<int>&
partition_columns_input_index,
+ const std::vector<std::string>&
values) {
+ std::stringstream partition_name_stream;
+
+ for (size_t i = 0; i < partition_columns_input_index.size(); i++) {
+ if (i > 0) {
+ partition_name_stream << '/';
+ }
+ std::string column = columns[partition_columns_input_index[i]].name;
+ std::string value = values[i];
+ std::transform(column.begin(), column.end(), column.begin(),
+ [&](char c) { return std::tolower(c); });
+ partition_name_stream << escape_path_name(column) << '=' <<
escape_path_name(value);
+ }
+
+ return partition_name_stream.str();
+}
+
+std::string VHiveUtils::escape_path_name(const std::string& path) {
+ if (path.empty()) {
+ return "__HIVE_DEFAULT_PARTITION__";
+ }
+
+ std::smatch match;
+ if (!std::regex_search(path, match, PATH_CHAR_TO_ESCAPE)) {
+ return path;
+ }
+
+ std::stringstream ss;
+ size_t from_index = 0;
+ auto begin = path.begin();
+ auto end = path.end();
+ while (std::regex_search(begin + from_index, end, match,
PATH_CHAR_TO_ESCAPE)) {
+ size_t escape_at_index = match.position() + from_index;
+ if (escape_at_index > from_index) {
+ ss << path.substr(from_index, escape_at_index - from_index);
+ }
+ char c = path[escape_at_index];
+ ss << '%' << std::hex << std::uppercase << static_cast<int>(c >> 4)
+ << static_cast<int>(c & 0xF);
+ from_index = escape_at_index + 1;
+ }
+ if (from_index < path.length()) {
+ ss << path.substr(from_index);
+ }
+ return ss.str();
+}
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/vhive_utils.h
b/be/src/vec/sink/writer/vhive_utils.h
new file mode 100644
index 00000000000..49cd45dc3a8
--- /dev/null
+++ b/be/src/vec/sink/writer/vhive_utils.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/DataSinks_types.h>
+
+#include <algorithm>
+#include <iostream>
+#include <regex>
+#include <sstream>
+#include <string>
+#include <vector>
+
+namespace doris {
+namespace vectorized {
+
+class VHiveUtils {
+public:
+ VHiveUtils() = delete;
+
+ static const std::regex PATH_CHAR_TO_ESCAPE;
+
+ static std::string make_partition_name(const std::vector<THiveColumn>&
columns,
+ const std::vector<int>&
partition_columns_input_index,
+ const std::vector<std::string>&
values);
+
+ static std::string escape_path_name(const std::string& path);
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/test/util/indexed_priority_queue_test.cpp
b/be/test/util/indexed_priority_queue_test.cpp
new file mode 100644
index 00000000000..54686fc0194
--- /dev/null
+++ b/be/test/util/indexed_priority_queue_test.cpp
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/indexed_priority_queue.hpp"
+
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class IndexedPriorityQueueTest : public testing::Test {
+public:
+ IndexedPriorityQueueTest() = default;
+ virtual ~IndexedPriorityQueueTest() = default;
+};
+
+TEST_F(IndexedPriorityQueueTest, test_high_to_low) {
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::HIGH_TO_LOW> pq;
+
+ pq.add_or_update(3, 10);
+ pq.add_or_update(1, 5);
+ pq.add_or_update(4, 15);
+ pq.add_or_update(2, 8);
+ pq.add_or_update(5, 5);
+ pq.add_or_update(6, 5);
+
+ std::vector<int> expected_elements = {4, 3, 2, 1, 5, 6};
+ std::vector<int> actual_elements;
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+
+ int removed = 2;
+ pq.remove(removed);
+
+ expected_elements = {4, 3, 1, 5, 6};
+ actual_elements.clear();
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+
+ pq.add_or_update(4, 1);
+
+ expected_elements = {3, 1, 5, 6, 4};
+ actual_elements.clear();
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+}
+
+TEST_F(IndexedPriorityQueueTest, test_low_to_high) {
+ IndexedPriorityQueue<int,
IndexedPriorityQueuePriorityOrdering::LOW_TO_HIGH> pq;
+
+ pq.add_or_update(3, 10);
+ pq.add_or_update(1, 5);
+ pq.add_or_update(4, 15);
+ pq.add_or_update(2, 8);
+ pq.add_or_update(5, 5);
+ pq.add_or_update(6, 5);
+
+ std::vector<int> expected_elements = {1, 5, 6, 2, 3, 4};
+ std::vector<int> actual_elements;
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+
+ int removed = 2;
+ pq.remove(removed);
+
+ expected_elements = {1, 5, 6, 3, 4};
+ actual_elements.clear();
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+
+ pq.add_or_update(4, 1);
+
+ expected_elements = {4, 1, 5, 6, 3};
+ actual_elements.clear();
+ for (auto& elem : pq) {
+ actual_elements.push_back(elem);
+ }
+ EXPECT_EQ(expected_elements, actual_elements);
+}
+
+} // namespace doris
diff --git a/be/test/vec/exec/skewed_partition_rebalancer_test.cpp
b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp
new file mode 100644
index 00000000000..f5ce4d2bb8e
--- /dev/null
+++ b/be/test/vec/exec/skewed_partition_rebalancer_test.cpp
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is porting from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java
+// to cpp and modified by Doris
+
+#include "vec/exec/skewed_partition_rebalancer.h"
+
+#include <gtest/gtest.h>
+
+#include <list>
+
+namespace doris::vectorized {
+
+class SkewedPartitionRebalancerTest : public testing::Test {
+public:
+ SkewedPartitionRebalancerTest() = default;
+ virtual ~SkewedPartitionRebalancerTest() = default;
+
+private:
+ std::vector<std::list<int>> _get_partition_positions(
+ std::unique_ptr<SkewedPartitionRebalancer>& rebalancer,
+ std::vector<long>& partition_row_count, int partition_count, int
max_position) {
+ std::vector<std::list<int>>
partitionPositions(rebalancer->get_task_count());
+
+ for (int partition = 0; partition < rebalancer->get_task_count();
partition++) {
+ partitionPositions[partition] = std::list<int>();
+ }
+
+ for (int position = 0; position < max_position; position++) {
+ int partition = position % partition_count;
+ partition = rebalancer->get_task_id(partition,
partition_row_count[partition]++);
+ partitionPositions[partition].push_back(position);
+ }
+
+ return partitionPositions;
+ }
+
+ static bool _vectors_equal(const std::vector<std::list<int>>& vec1,
+ const std::vector<std::list<int>>& vec2) {
+ if (vec1.size() != vec2.size()) {
+ return false;
+ }
+ for (size_t i = 0; i < vec1.size(); i++) {
+ if (vec1[i] != vec2[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static bool _compare_vector_of_lists(const std::vector<std::list<int>>&
expected,
+ const std::vector<std::list<int>>&
actual) {
+ if (expected.size() != actual.size()) {
+ return false;
+ }
+
+ for (size_t i = 0; i < expected.size(); ++i) {
+ if (expected[i] != actual[i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+};
+
+TEST_F(SkewedPartitionRebalancerTest, test_rebalance_with_skewness) {
+ const int partitionCount = 3;
+ const int taskCount = 3;
+ const int taskBucketCount = 3;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 *
MEGABYTE; // 1MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 1000);
+ rebalancer->add_partition_row_count(2, 1000);
+ rebalancer->add_data_processed(40 * MEGABYTE);
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 3, 6, 9, 12, 15}, {1, 4, 7, 10, 13, 16}, {2, 5, 8, 11, 14}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}},
rebalancer->get_partition_assignments()));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 1000);
+ rebalancer->add_partition_row_count(2, 1000);
+ rebalancer->add_data_processed(20 * MEGABYTE);
+
+ // Rebalancing will happen since we crossed the data processed limit.
+ // Part0 -> Task1 (Bucket1), Part1 -> Task0 (Bucket1), Part2 -> Task0
(Bucket2)
+ rebalancer->rebalance();
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 2, 4, 6, 8, 10, 12, 14, 16}, {1, 3, 7, 9, 13, 15}, {5, 11}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0, 1}, {1, 0}, {2, 0}},
+
rebalancer->get_partition_assignments()));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 1000);
+ rebalancer->add_partition_row_count(2, 1000);
+ rebalancer->add_data_processed(200 * MEGABYTE);
+
+ // Rebalancing will happen
+ // Part0 -> Task2 (Bucket1), Part1 -> Task2 (Bucket2), Part2 -> Task1
(Bucket2)
+ rebalancer->rebalance();
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 2, 4, 9, 11, 13}, {1, 3, 5, 10, 12, 14}, {6, 7, 8, 15, 16}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1, 0, 2}, {2, 0, 1}},
+
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest, test_rebalance_without_skewness) {
+ const int partitionCount = 6;
+ const int taskCount = 3;
+ const int taskBucketCount = 2;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 *
MEGABYTE; // 1MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 700);
+ rebalancer->add_partition_row_count(2, 600);
+ rebalancer->add_partition_row_count(3, 1000);
+ rebalancer->add_partition_row_count(4, 700);
+ rebalancer->add_partition_row_count(5, 600);
+
+ rebalancer->add_data_processed(500 * MEGABYTE);
+ // No rebalancing will happen since there is no skewness across task
buckets
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 3}, {1, 4}, {2, 5}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 6)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}, {0}, {1}, {2}},
+
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+ test_no_rebalance_when_data_written_is_less_than_the_rebalance_limit) {
+ const int partitionCount = 3;
+ const int taskCount = 3;
+ const int taskBucketCount = 3;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 *
MEGABYTE; // 1MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 0);
+ rebalancer->add_partition_row_count(2, 0);
+
+ rebalancer->add_data_processed(40 * MEGABYTE);
+ // No rebalancing will happen since we do not cross the max data processed
limit of 50MB
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 3}, {1, 4}, {2, 5}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 6)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}},
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+
test_no_rebalance_when_data_written_by_the_partition_is_less_than_writer_sacling_min_data_processed)
{
+ const int partitionCount = 3;
+ const int taskCount = 3;
+ const int taskBucketCount = 3;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 *
MEGABYTE; // 50MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 600);
+ rebalancer->add_partition_row_count(2, 0);
+
+ rebalancer->add_data_processed(60 * MEGABYTE);
+ // No rebalancing will happen since no partition has crossed the
writerScalingMinDataProcessed limit of 50MB
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 3}, {1, 4}, {2, 5}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 6)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0}, {1}, {2}},
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
+ test_rebalance_partition_to_single_task_in_a_rebalancing_loop) {
+ const int partitionCount = 3;
+ const int taskCount = 3;
+ const int taskBucketCount = 3;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 *
MEGABYTE; // 1MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 0);
+ rebalancer->add_partition_row_count(2, 0);
+
+ rebalancer->add_data_processed(60 * MEGABYTE);
+ // rebalancing will only happen to a single task even though two tasks are
available
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 6, 12}, {1, 3, 4, 7, 9, 10, 13, 15, 16}, {2, 5, 8, 11, 14}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(
+ _compare_vector_of_lists({{0, 1}, {1}, {2}},
rebalancer->get_partition_assignments()));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 0);
+ rebalancer->add_partition_row_count(2, 0);
+ rebalancer->add_data_processed(60 * MEGABYTE);
+ rebalancer->rebalance();
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 9}, {1, 3, 4, 7, 10, 12, 13, 16}, {2, 5, 6, 8, 11, 14, 15}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0, 1, 2}, {1}, {2}},
+
rebalancer->get_partition_assignments()));
+}
+
+TEST_F(SkewedPartitionRebalancerTest,
test_consider_skewed_partition_only_within_a_cycle) {
+ const int partitionCount = 3;
+ const int taskCount = 3;
+ const int taskBucketCount = 1;
+ const long MEGABYTE = 1024 * 1024;
+ const long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = 1 *
MEGABYTE; // 1MB
+ const long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = 50 * MEGABYTE;
// 50MB
+
+ std::unique_ptr<SkewedPartitionRebalancer> rebalancer(
+ new SkewedPartitionRebalancer(partitionCount, taskCount,
taskBucketCount,
+
MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD,
+
MIN_DATA_PROCESSED_REBALANCE_THRESHOLD));
+
+ rebalancer->add_partition_row_count(0, 1000);
+ rebalancer->add_partition_row_count(1, 800);
+ rebalancer->add_partition_row_count(2, 0);
+
+ rebalancer->add_data_processed(60 * MEGABYTE);
+ // rebalancing will happen for partition 0 to task 2 since partition 0 is
skewed.
+ rebalancer->rebalance();
+
+ std::vector<long> partitionRowCount(partitionCount, 0);
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 6, 12}, {1, 4, 7, 10, 13, 16}, {2, 3, 5, 8, 9, 11, 14, 15}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(
+ _compare_vector_of_lists({{0, 2}, {1}, {2}},
rebalancer->get_partition_assignments()));
+
+ rebalancer->add_partition_row_count(0, 0);
+ rebalancer->add_partition_row_count(1, 800);
+ rebalancer->add_partition_row_count(2, 1000);
+ // rebalancing will happen for partition 2 to task 0 since partition 2 is
skewed. Even though partition 1 has
+ // written more amount of data from start, it will not be considered since
it is not the most skewed in
+ // this rebalancing cycle.
+ rebalancer->add_data_processed(60 * MEGABYTE);
+ rebalancer->rebalance();
+
+ ASSERT_TRUE(_vectors_equal(
+ {{0, 2, 6, 8, 12, 14}, {1, 4, 7, 10, 13, 16}, {3, 5, 9, 11, 15}},
+ _get_partition_positions(rebalancer, partitionRowCount,
partitionCount, 17)));
+ EXPECT_TRUE(_compare_vector_of_lists({{0, 2}, {1}, {2, 0}},
+
rebalancer->get_partition_assignments()));
+}
+
+} // namespace doris::vectorized
diff --git a/be/test/vec/exec/vhive_utils_test.cpp
b/be/test/vec/exec/vhive_utils_test.cpp
new file mode 100644
index 00000000000..d14a004e50b
--- /dev/null
+++ b/be/test/vec/exec/vhive_utils_test.cpp
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/writer/vhive_utils.h"
+
+#include <gtest/gtest.h>
+
+namespace doris::vectorized {
+
+class VHiveUtilsTest : public testing::Test {
+public:
+ VHiveUtilsTest() = default;
+ virtual ~VHiveUtilsTest() = default;
+};
+
+TEST_F(VHiveUtilsTest, test_make_partition_name) {
+ {
+ std::vector<THiveColumn> columns;
+ THiveColumn column1;
+ column1.name = "abc";
+ columns.emplace_back(std::move(column1));
+ std::vector<int> partition_columns_input_index = {0};
+ EXPECT_EQ("abc=xyz",
+ VHiveUtils::make_partition_name(columns,
partition_columns_input_index, {"xyz"}));
+ }
+
+ {
+ std::vector<THiveColumn> columns;
+ THiveColumn column1;
+ column1.name = "abc:qqq";
+ columns.emplace_back(std::move(column1));
+ std::vector<int> partition_columns_input_index = {0};
+ EXPECT_EQ("abc%3Aqqq=xyz%2Fyyy%3Dzzz",
+ VHiveUtils::make_partition_name(columns,
partition_columns_input_index,
+ {"xyz/yyy=zzz"}));
+ }
+
+ {
+ std::vector<THiveColumn> columns;
+ THiveColumn column1;
+ column1.name = "abc";
+ columns.emplace_back(std::move(column1));
+ THiveColumn column2;
+ column2.name = "def";
+ columns.emplace_back(std::move(column2));
+ THiveColumn column3;
+ column3.name = "xyz";
+ columns.emplace_back(std::move(column3));
+ std::vector<int> partition_columns_input_index = {0, 1, 2};
+ EXPECT_EQ("abc=qqq/def=rrr/xyz=sss",
+ VHiveUtils::make_partition_name(columns,
partition_columns_input_index,
+ {"qqq", "rrr", "sss"}));
+ }
+}
+
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]