This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 87110ad3e3 [chore](Sink)remove useless OlapTablePartitionParam-related
code (#15549)
87110ad3e3 is described below
commit 87110ad3e32d404147922f5852a0895a3b956447
Author: AlexYue <[email protected]>
AuthorDate: Mon Jan 2 22:47:16 2023 +0800
[chore](Sink)remove useless OlapTablePartitionParam-related code (#15549)
---
be/src/exec/tablet_info.cpp | 304 ---------------------------------------
be/src/exec/tablet_info.h | 120 +---------------
be/src/vec/sink/vtablet_sink.cpp | 7 +-
be/src/vec/sink/vtablet_sink.h | 2 +-
4 files changed, 5 insertions(+), 428 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 62926e5fa9..32102729ec 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -18,10 +18,7 @@
#include "exec/tablet_info.h"
#include "runtime/large_int_value.h"
-#include "runtime/mem_pool.h"
-#include "util/random.h"
#include "util/string_parser.hpp"
-#include "util/time.h"
namespace doris {
@@ -134,307 +131,6 @@ std::string OlapTableSchemaParam::debug_string() const {
return ss.str();
}
-std::string OlapTablePartition::debug_string(TupleDescriptor* tuple_desc)
const {
- std::stringstream ss;
- std::stringstream in_keys_ss;
- int idx = 0;
- in_keys_ss << "[";
- for (auto in_key : in_keys) {
- if (idx++ > 0) {
- in_keys_ss << ",";
- }
- in_keys_ss << Tuple::to_string(in_key, *tuple_desc);
- }
- in_keys_ss << "]";
- ss << "(id=" << id << ",start_key=" << Tuple::to_string(start_key,
*tuple_desc)
- << ",end_key=" << Tuple::to_string(end_key, *tuple_desc) << ",in_key="
<< in_keys_ss.str()
- << ",num_buckets=" << num_buckets << ",indexes=[";
- idx = 0;
- for (auto& index : indexes) {
- if (idx++ > 0) {
- ss << ",";
- }
- ss << "(id=" << index.index_id << ",tablets=[";
- int jdx = 0;
- for (auto id : index.tablets) {
- if (jdx++ > 0) {
- ss << ",";
- }
- ss << id;
- }
- ss << "])";
- }
- ss << "])";
- return ss.str();
-}
-
-OlapTablePartitionParam::OlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>
schema,
- const
TOlapTablePartitionParam& t_param)
- : _schema(schema), _t_param(t_param), _mem_pool(new MemPool()) {}
-
-OlapTablePartitionParam::~OlapTablePartitionParam() {}
-
-Status OlapTablePartitionParam::init() {
- std::map<std::string, SlotDescriptor*> slots_map;
- for (auto slot_desc : _schema->tuple_desc()->slots()) {
- slots_map.emplace(slot_desc->col_name(), slot_desc);
- }
- if (_t_param.__isset.partition_column) {
- auto it = slots_map.find(_t_param.partition_column);
- if (it == std::end(slots_map)) {
- return Status::InternalError("partition column not found,
column={}",
- _t_param.partition_column);
- }
- _partition_slot_descs.push_back(it->second);
- } else if (_t_param.__isset.partition_columns) {
- for (auto& part_col : _t_param.partition_columns) {
- auto it = slots_map.find(part_col);
- if (it == std::end(slots_map)) {
- return Status::InternalError("partition column not found,
column={}", part_col);
- }
- _partition_slot_descs.push_back(it->second);
- }
- }
-
- _partitions_map.reset(new std::map<Tuple*, OlapTablePartition*,
OlapTablePartKeyComparator>(
- OlapTablePartKeyComparator(_partition_slot_descs)));
- if (_t_param.__isset.distributed_columns) {
- for (auto& col : _t_param.distributed_columns) {
- auto it = slots_map.find(col);
- if (it == std::end(slots_map)) {
- return Status::InternalError("distributed column not found,
columns={}", col);
- }
- _distributed_slot_descs.emplace_back(it->second);
- }
- }
- if (_distributed_slot_descs.empty()) {
- _compute_tablet_index = [](Tuple* key, int64_t num_buckets) ->
uint32_t {
- return butil::fast_rand() % num_buckets;
- };
- } else {
- _compute_tablet_index = [this](Tuple* key, int64_t num_buckets) ->
uint32_t {
- uint32_t hash_val = 0;
- for (auto slot_desc : _distributed_slot_descs) {
- void* slot = nullptr;
- if (!key->is_null(slot_desc->null_indicator_offset())) {
- slot = key->get_slot(slot_desc->tuple_offset());
- }
- if (slot != nullptr) {
- hash_val = RawValue::zlib_crc32(slot, slot_desc->type(),
hash_val);
- } else {
- hash_val = HashUtil::zlib_crc_hash_null(hash_val);
- }
- }
- return hash_val % num_buckets;
- };
- }
- // initial partitions
- for (int i = 0; i < _t_param.partitions.size(); ++i) {
- const TOlapTablePartition& t_part = _t_param.partitions[i];
- OlapTablePartition* part = _obj_pool.add(new OlapTablePartition());
- part->id = t_part.id;
-
- if (t_part.__isset.start_key) {
- // deprecated, use start_keys instead
- std::vector<TExprNode> exprs = {t_part.start_key};
- RETURN_IF_ERROR(_create_partition_keys(exprs, &part->start_key));
- } else if (t_part.__isset.start_keys) {
- RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys,
&part->start_key));
- }
- if (t_part.__isset.end_key) {
- // deprecated, use end_keys instead
- std::vector<TExprNode> exprs = {t_part.end_key};
- RETURN_IF_ERROR(_create_partition_keys(exprs, &part->end_key));
- } else if (t_part.__isset.end_keys) {
- RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys,
&part->end_key));
- }
- if (t_part.__isset.in_keys) {
- part->in_keys.resize(t_part.in_keys.size());
- for (int j = 0; j < t_part.in_keys.size(); j++) {
- RETURN_IF_ERROR(_create_partition_keys(t_part.in_keys[j],
&part->in_keys[j]));
- }
- }
-
- part->num_buckets = t_part.num_buckets;
- auto num_indexes = _schema->indexes().size();
- if (t_part.indexes.size() != num_indexes) {
- return Status::InternalError(
- "number of partition's index is not equal with schema's,"
- "num_part_indexes={}, num_schema_indexes={}",
- t_part.indexes.size(), num_indexes);
- }
- part->indexes = t_part.indexes;
- std::sort(part->indexes.begin(), part->indexes.end(),
- [](const OlapTableIndexTablets& lhs, const
OlapTableIndexTablets& rhs) {
- return lhs.index_id < rhs.index_id;
- });
- // check index
- for (int j = 0; j < num_indexes; ++j) {
- if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
- std::stringstream ss;
- ss << "partition's index is not equal with schema's"
- << ", part_index=" << part->indexes[j].index_id
- << ", schema_index=" << _schema->indexes()[j]->index_id;
- return Status::InternalError(
- "partition's index is not equal with schema's"
- ", part_index={}, schema_index={}",
- part->indexes[j].index_id,
_schema->indexes()[j]->index_id);
- }
- }
- _partitions.emplace_back(part);
- if (t_part.__isset.in_keys) {
- for (auto in_key : part->in_keys) {
- _partitions_map->emplace(in_key, part);
- }
- } else {
- _partitions_map->emplace(part->end_key, part);
- }
- }
- return Status::OK();
-}
-
-bool OlapTablePartitionParam::find_partition(Tuple* tuple,
- const OlapTablePartition**
partition) const {
- const TOlapTablePartition& t_part = _t_param.partitions[0];
- auto it = t_part.__isset.in_keys ? _partitions_map->find(tuple)
- : _partitions_map->upper_bound(tuple);
- if (it == _partitions_map->end()) {
- return false;
- }
- if (_part_contains(it->second, tuple)) {
- *partition = it->second;
- return true;
- }
- return false;
-}
-
-uint32_t OlapTablePartitionParam::find_tablet(Tuple* tuple,
- const OlapTablePartition&
partition) const {
- return _compute_tablet_index(tuple, partition.num_buckets);
-}
-
-Status OlapTablePartitionParam::_create_partition_keys(const
std::vector<TExprNode>& t_exprs,
- Tuple** part_key) {
- Tuple* tuple =
(Tuple*)_mem_pool->allocate(_schema->tuple_desc()->byte_size());
- for (int i = 0; i < t_exprs.size(); i++) {
- const TExprNode& t_expr = t_exprs[i];
- RETURN_IF_ERROR(_create_partition_key(t_expr, tuple,
_partition_slot_descs[i]));
- }
- *part_key = tuple;
- return Status::OK();
-}
-
-Status OlapTablePartitionParam::_create_partition_key(const TExprNode& t_expr,
Tuple* tuple,
- SlotDescriptor*
slot_desc) {
- void* slot = tuple->get_slot(slot_desc->tuple_offset());
- tuple->set_not_null(slot_desc->null_indicator_offset());
- switch (t_expr.node_type) {
- case TExprNodeType::DATE_LITERAL: {
- if ((t_expr.type.types[0].scalar_type.type == TPrimitiveType::DATE) ||
- (t_expr.type.types[0].scalar_type.type ==
TPrimitiveType::DATETIME)) {
- if (!reinterpret_cast<DateTimeValue*>(slot)->from_date_str(
- t_expr.date_literal.value.c_str(),
t_expr.date_literal.value.size())) {
- std::stringstream ss;
- ss << "invalid date literal in partition column, date=" <<
t_expr.date_literal;
- return Status::InternalError(ss.str());
- }
- } else if (t_expr.type.types[0].scalar_type.type ==
TPrimitiveType::DATEV2) {
- if (!reinterpret_cast<
-
doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType>*>(slot)
- ->from_date_str(t_expr.date_literal.value.c_str(),
- t_expr.date_literal.value.size())) {
- std::stringstream ss;
- ss << "invalid date literal in partition column, date=" <<
t_expr.date_literal;
- return Status::InternalError(ss.str());
- }
- } else {
- if (!reinterpret_cast<
-
doris::vectorized::DateV2Value<doris::vectorized::DateTimeV2ValueType>*>(
- slot)
- ->from_date_str(t_expr.date_literal.value.c_str(),
- t_expr.date_literal.value.size())) {
- std::stringstream ss;
- ss << "invalid date literal in partition column, date=" <<
t_expr.date_literal;
- return Status::InternalError(ss.str());
- }
- }
- break;
- }
- case TExprNodeType::INT_LITERAL: {
- switch (t_expr.type.types[0].scalar_type.type) {
- case TPrimitiveType::TINYINT:
- *reinterpret_cast<int8_t*>(slot) = t_expr.int_literal.value;
- break;
- case TPrimitiveType::SMALLINT:
- *reinterpret_cast<int16_t*>(slot) = t_expr.int_literal.value;
- break;
- case TPrimitiveType::INT:
- *reinterpret_cast<int32_t*>(slot) = t_expr.int_literal.value;
- break;
- case TPrimitiveType::BIGINT:
- *reinterpret_cast<int64_t*>(slot) = t_expr.int_literal.value;
- break;
- default:
- DCHECK(false) << "unsupported int literal type, type=" <<
t_expr.type.types[0].type;
- break;
- }
- break;
- }
- case TExprNodeType::LARGE_INT_LITERAL: {
- StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
- __int128 val =
StringParser::string_to_int<__int128>(t_expr.large_int_literal.value.c_str(),
-
t_expr.large_int_literal.value.size(),
- &parse_result);
- if (parse_result != StringParser::PARSE_SUCCESS) {
- val = MAX_INT128;
- }
- memcpy(slot, &val, sizeof(val));
- break;
- }
- case TExprNodeType::STRING_LITERAL: {
- int len = t_expr.string_literal.value.size();
- const char* str_val = t_expr.string_literal.value.c_str();
-
- // CHAR is a fixed-length string and needs to use the length in the
slot definition,
- // VARVHAR is a variable-length string and needs to use the length of
the string itself
- // padding 0 to CHAR field
- if (TYPE_CHAR == slot_desc->type().type && len <
slot_desc->type().len) {
- auto new_ptr = (char*)_mem_pool->allocate(slot_desc->type().len);
- memset(new_ptr, 0, slot_desc->type().len);
- memcpy(new_ptr, str_val, len);
-
- str_val = new_ptr;
- len = slot_desc->type().len;
- }
- *reinterpret_cast<StringValue*>(slot) =
StringValue(const_cast<char*>(str_val), len);
- break;
- }
- case TExprNodeType::BOOL_LITERAL: {
- *reinterpret_cast<bool*>(slot) = t_expr.bool_literal.value;
- break;
- }
- default: {
- return Status::InternalError("unsupported partition column node type,
type={}",
- t_expr.node_type);
- }
- }
- return Status::OK();
-}
-
-std::string OlapTablePartitionParam::debug_string() const {
- std::stringstream ss;
- ss << "partitions=[";
- int idx = 0;
- for (auto part : _partitions) {
- if (idx++ > 0) {
- ss << ",";
- }
- ss << part->debug_string(_schema->tuple_desc());
- }
- ss << "]";
- return ss.str();
-}
-
VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>&
schema,
const
TOlapTablePartitionParam& t_param)
: _schema(schema),
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 0b6fd00dbb..3e955b9b6b 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -30,7 +30,6 @@
#include "olap/tablet_schema.h"
#include "runtime/descriptors.h"
#include "runtime/raw_value.h"
-#include "runtime/tuple.h"
#include "vec/core/block.h"
namespace doris {
@@ -48,8 +47,8 @@ struct OlapTableIndexSchema {
class OlapTableSchemaParam {
public:
- OlapTableSchemaParam() {}
- ~OlapTableSchemaParam() noexcept {}
+ OlapTableSchemaParam() = default;
+ ~OlapTableSchemaParam() noexcept = default;
Status init(const TOlapTableSchemaParam& tschema);
Status init(const POlapTableSchemaParam& pschema);
@@ -91,121 +90,6 @@ using OlapTableIndexTablets = TOlapTableIndexTablets;
// 2: required list<i64> tablets
// }
-struct OlapTablePartition {
- int64_t id = 0;
- Tuple* start_key = nullptr;
- Tuple* end_key = nullptr;
- std::vector<Tuple*> in_keys;
- int64_t num_buckets = 0;
- std::vector<OlapTableIndexTablets> indexes;
-
- std::string debug_string(TupleDescriptor* tuple_desc) const;
-};
-
-class OlapTablePartKeyComparator {
-public:
- OlapTablePartKeyComparator(const std::vector<SlotDescriptor*>& slot_descs)
- : _slot_descs(slot_descs) {}
- // return true if lhs < rhs
- // 'nullptr' is max value, but 'null' is min value
- bool operator()(const Tuple* lhs, const Tuple* rhs) const {
- if (lhs == nullptr) {
- return false;
- } else if (rhs == nullptr) {
- return true;
- }
-
- for (auto slot_desc : _slot_descs) {
- bool lhs_null = lhs->is_null(slot_desc->null_indicator_offset());
- bool rhs_null = rhs->is_null(slot_desc->null_indicator_offset());
- if (lhs_null && rhs_null) {
- continue;
- }
- if (lhs_null || rhs_null) {
- return !rhs_null;
- }
-
- auto lhs_value = lhs->get_slot(slot_desc->tuple_offset());
- auto rhs_value = rhs->get_slot(slot_desc->tuple_offset());
-
- int res = RawValue::compare(lhs_value, rhs_value,
slot_desc->type());
- if (res != 0) {
- return res < 0;
- }
- }
- // equal, return false
- return false;
- }
-
-private:
- const std::vector<SlotDescriptor*>& _slot_descs;
-};
-
-// store an olap table's tablet information
-class OlapTablePartitionParam {
-public:
- OlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam> schema,
- const TOlapTablePartitionParam& param);
- ~OlapTablePartitionParam();
-
- Status init();
-
- int64_t db_id() const { return _t_param.db_id; }
- int64_t table_id() const { return _t_param.table_id; }
- int64_t version() const { return _t_param.version; }
-
- // return true if we found this tuple in partition
- bool find_partition(Tuple* tuple, const OlapTablePartition** partition)
const;
-
- uint32_t find_tablet(Tuple* tuple, const OlapTablePartition& partition)
const;
-
- const std::vector<OlapTablePartition*>& get_partitions() const { return
_partitions; }
- std::string debug_string() const;
-
-private:
- Status _create_partition_keys(const std::vector<TExprNode>& t_exprs,
Tuple** part_key);
-
- Status _create_partition_key(const TExprNode& t_expr, Tuple* tuple,
SlotDescriptor* slot_desc);
-
- std::function<uint32_t(Tuple*, int64_t)> _compute_tablet_index;
-
- // check if this partition contain this key
- bool _part_contains(OlapTablePartition* part, Tuple* key) const {
- if ((part->start_key == nullptr) && (part->in_keys.size() == 0)) {
- // start_key is nullptr means the lower bound is boundless
- return true;
- }
- OlapTablePartKeyComparator comparator(_partition_slot_descs);
- const TOlapTablePartition& t_part = _t_param.partitions[0];
- // when list partition, return true if equals.
- if (t_part.__isset.in_keys) {
- bool ret = false;
- for (auto in_key : part->in_keys) {
- ret = !comparator(key, in_key) && !comparator(in_key, key);
- if (ret) {
- break;
- }
- }
- return ret;
- }
- return !comparator(key, part->start_key);
- }
-
-private:
- // this partition only valid in this schema
- std::shared_ptr<OlapTableSchemaParam> _schema;
- TOlapTablePartitionParam _t_param;
-
- std::vector<SlotDescriptor*> _partition_slot_descs;
- std::vector<SlotDescriptor*> _distributed_slot_descs;
-
- ObjectPool _obj_pool;
- std::unique_ptr<MemPool> _mem_pool;
- std::vector<OlapTablePartition*> _partitions;
- std::unique_ptr<std::map<Tuple*, OlapTablePartition*,
OlapTablePartKeyComparator>>
- _partitions_map;
-};
-
using BlockRow = std::pair<vectorized::Block*, int32_t>;
struct VOlapTablePartition {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index f042ce50b4..03a0181e55 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -772,8 +772,6 @@ Status VOlapTableSink::init(const TDataSink& t_sink) {
_tuple_desc_id = table_sink.tuple_id;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
- _partition = _pool->add(new OlapTablePartitionParam(_schema,
table_sink.partition));
- RETURN_IF_ERROR(_partition->init());
_location = _pool->add(new OlapTableLocationParam(table_sink.location));
_nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
if (table_sink.__isset.write_single_replica &&
table_sink.write_single_replica) {
@@ -802,8 +800,7 @@ Status VOlapTableSink::init(const TDataSink& t_sink) {
findTabletMode = FindTabletMode::FIND_TABLET_EVERY_BATCH;
}
}
- _vpartition = _pool->add(
- new doris::VOlapTablePartitionParam(_schema,
t_sink.olap_table_sink.partition));
+ _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema,
table_sink.partition));
return _vpartition->init();
}
@@ -882,7 +879,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
_load_mem_limit = state->get_load_mem_limit();
// open all channels
- const auto& partitions = _partition->get_partitions();
+ const auto& partitions = _vpartition->get_partitions();
for (int i = 0; i < _schema->indexes().size(); ++i) {
// collect all tablets belong to this rollup
std::vector<TTabletWithPartition> tablets;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 1c0a1be8d1..c299fdca55 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
+#include "exprs/expr_context.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/row_batch.h"
@@ -535,7 +536,6 @@ private:
enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH,
FIND_TABLET_EVERY_SINK };
FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW;
- OlapTablePartitionParam* _partition = nullptr;
std::vector<ExprContext*> _output_expr_ctxs;
VOlapTablePartitionParam* _vpartition = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]