This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3d2425d472 [Improvement](join) remove insert_indices_from_join and
special judge for -1 (#27779)
e3d2425d472 is described below
commit e3d2425d47264e885d55629c2b25b15c29f74f18
Author: Pxl <[email protected]>
AuthorDate: Mon Dec 4 11:03:22 2023 +0800
[Improvement](join) remove insert_indices_from_join and special judge for
-1 (#27779)
remove insert_indices_from_join and special judge for -1
---
be/src/olap/delta_writer.cpp | 2 +-
be/src/olap/delta_writer.h | 2 +-
be/src/olap/delta_writer_v2.cpp | 2 +-
be/src/olap/delta_writer_v2.h | 2 +-
be/src/olap/memtable.cpp | 6 +-
be/src/olap/memtable.h | 2 +-
be/src/olap/memtable_writer.cpp | 2 +-
be/src/olap/memtable_writer.h | 2 +-
be/src/olap/tablet.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 4 +-
.../exec/nested_loop_join_probe_operator.cpp | 2 +-
.../pipeline_x/local_exchange/local_exchanger.cpp | 2 +-
.../pipeline_x/local_exchange/local_exchanger.h | 2 +-
be/src/runtime/tablets_channel.cpp | 7 ++-
be/src/vec/columns/column.h | 13 +----
be/src/vec/columns/column_array.cpp | 23 ++------
be/src/vec/columns/column_array.h | 7 +--
be/src/vec/columns/column_complex.h | 25 +-------
be/src/vec/columns/column_const.h | 9 +--
be/src/vec/columns/column_decimal.h | 16 +-----
be/src/vec/columns/column_dictionary.h | 9 +--
be/src/vec/columns/column_dummy.h | 4 +-
be/src/vec/columns/column_fixed_length_object.h | 35 ++---------
be/src/vec/columns/column_map.cpp | 23 ++------
be/src/vec/columns/column_map.h | 6 +-
be/src/vec/columns/column_nothing.h | 5 --
be/src/vec/columns/column_nullable.cpp | 14 +----
be/src/vec/columns/column_nullable.h | 6 +-
be/src/vec/columns/column_object.cpp | 17 +-----
be/src/vec/columns/column_object.h | 7 +--
be/src/vec/columns/column_string.cpp | 67 +++++-----------------
be/src/vec/columns/column_string.h | 7 +--
be/src/vec/columns/column_struct.cpp | 15 +----
be/src/vec/columns/column_struct.h | 7 +--
be/src/vec/columns/column_vector.cpp | 27 +--------
be/src/vec/columns/column_vector.h | 6 +-
be/src/vec/columns/predicate_column.h | 9 +--
be/src/vec/core/block.cpp | 3 +-
be/src/vec/core/block.h | 2 +-
.../vec/exec/join/process_hash_table_probe_impl.h | 8 +--
be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +-
be/src/vec/exec/scan/pip_scanner_context.h | 8 +--
be/src/vec/sink/vdata_stream_sender.cpp | 8 +--
be/src/vec/sink/vdata_stream_sender.h | 10 ++--
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
45 files changed, 106 insertions(+), 333 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 9b80e4ec3e4..310ffacad9a 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -108,7 +108,7 @@ Status DeltaWriter::append(const vectorized::Block* block) {
return write(block, {}, true);
}
-Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<int>& row_idxs,
+Status DeltaWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index d4519c36a78..d7e351a168e 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -67,7 +67,7 @@ public:
Status init();
- Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs,
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
bool is_append = false);
Status append(const vectorized::Block* block);
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 6f6dd939a4b..fb5db340be7 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -140,7 +140,7 @@ Status DeltaWriterV2::append(const vectorized::Block*
block) {
return write(block, {}, true);
}
-Status DeltaWriterV2::write(const vectorized::Block* block, const
std::vector<int>& row_idxs,
+Status DeltaWriterV2::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h
index 8a102c5706d..f0581bf56a6 100644
--- a/be/src/olap/delta_writer_v2.h
+++ b/be/src/olap/delta_writer_v2.h
@@ -70,7 +70,7 @@ public:
Status init();
- Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs,
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
bool is_append = false);
Status append(const vectorized::Block* block);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 093c241573e..fa7afe4ccda 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -166,7 +166,7 @@ int RowInBlockComparator::operator()(const RowInBlock*
left, const RowInBlock* r
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* input_block, const
std::vector<int>& row_idxs,
+void MemTable::insert(const vectorized::Block* input_block, const
std::vector<uint32_t>& row_idxs,
bool is_append) {
SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
vectorized::Block target_block = *input_block;
@@ -239,7 +239,7 @@ void
MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo
}
void MemTable::_put_into_output(vectorized::Block& in_block) {
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
- std::vector<int> row_pos_vec;
+ std::vector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < _row_in_blocks.size(); i++) {
@@ -330,7 +330,7 @@ void MemTable::_sort_by_cluster_keys() {
in_block = mutable_block.to_block();
SCOPED_RAW_TIMER(&_stat.put_into_output_ns);
- std::vector<int> row_pos_vec;
+ std::vector<uint32_t> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < row_in_blocks.size(); i++) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 04ad022c824..9a171b8ad82 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -180,7 +180,7 @@ public:
_flush_mem_tracker->consumption();
}
// insert tuple from (row_pos) to (row_pos+num_rows)
- void insert(const vectorized::Block* block, const std::vector<int>&
row_idxs,
+ void insert(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
bool is_append = false);
void shrink_memtable_by_agg();
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index a782901f441..0098ff6f8bf 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -88,7 +88,7 @@ Status MemTableWriter::append(const vectorized::Block* block)
{
return write(block, {}, true);
}
-Status MemTableWriter::write(const vectorized::Block* block, const
std::vector<int>& row_idxs,
+Status MemTableWriter::write(const vectorized::Block* block, const
std::vector<uint32_t>& row_idxs,
bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index 1893e2cd6a6..a6132aa8873 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -71,7 +71,7 @@ public:
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);
- Status write(const vectorized::Block* block, const std::vector<int>&
row_idxs,
+ Status write(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs,
bool is_append = false);
Status append(const vectorized::Block* block);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index bbd57cf1b51..21d390650ab 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2912,7 +2912,7 @@ void Tablet::sort_block(vectorized::Block& in_block,
vectorized::Block& output_b
<< " r_pos: " << r->_row_pos;
return value < 0;
});
- std::vector<int> row_pos_vec;
+ std::vector<uint32_t> row_pos_vec;
row_pos_vec.reserve(in_block.rows());
for (int i = 0; i < row_in_blocks.size(); i++) {
row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 8065ae3082a..8143e71a4ad 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -473,9 +473,9 @@ Status
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
int num_channels,
const HashValueType* __restrict
channel_ids,
int rows, vectorized::Block*
block, bool eos) {
- std::vector<int> channel2rows[num_channels];
+ std::vector<uint32_t> channel2rows[num_channels];
- for (int i = 0; i < rows; i++) {
+ for (uint32_t i = 0; i < rows; i++) {
channel2rows[channel_ids[i]].emplace_back(i);
}
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 371e0106149..a3d24aa0614 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -255,7 +255,7 @@ void
NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
.data();
const auto num_rows = cur_block.rows();
- std::vector<int> selector(num_rows);
+ std::vector<uint32_t> selector(num_rows);
size_t selector_idx = 0;
for (size_t j = 0; j < num_rows; j++) {
if constexpr (IsSemi) {
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 13a3f232222..3487f3bcbf7 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -82,7 +82,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
LocalExchangeSinkLocalState& local_state)
{
auto& data_queue = _data_queue;
const auto rows = block->rows();
- auto row_idx = std::make_shared<std::vector<int>>(rows);
+ auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_instances + 1, 0);
for (size_t i = 0; i < rows; ++i) {
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index b7acff688f6..6a9bebd7b44 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -49,7 +49,7 @@ class LocalExchangeSinkLocalState;
class ShuffleExchanger final : public Exchanger {
using PartitionedBlock =
std::pair<std::shared_ptr<vectorized::Block>,
- std::tuple<std::shared_ptr<std::vector<int>>, size_t,
size_t>>;
+ std::tuple<std::shared_ptr<std::vector<uint32_t>>,
size_t, size_t>>;
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 1dc7cf5afa3..5508bc3005a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -496,8 +496,9 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
return Status::OK();
}
- std::unordered_map<int64_t /* tablet_id */, std::vector<int> /* row index
*/> tablet_to_rowidxs;
- for (int i = 0; i < request.tablet_ids_size(); ++i) {
+ std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row
index */>
+ tablet_to_rowidxs;
+ for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
if (request.is_single_tablet_block()) {
break;
}
@@ -509,7 +510,7 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
}
auto it = tablet_to_rowidxs.find(tablet_id);
if (it == tablet_to_rowidxs.end()) {
- tablet_to_rowidxs.emplace(tablet_id, std::initializer_list<int>
{i});
+ tablet_to_rowidxs.emplace(tablet_id,
std::initializer_list<uint32_t> {i});
} else {
it->second.emplace_back(i);
}
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 80c5803bff4..b2cf72b016f 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -240,17 +240,8 @@ public:
/// Appends a batch elements from other column with the same type
/// indices_begin + indices_end represent the row indices of column src
- /// Warning:
- /// if *indices == -1 means the row is null
- virtual void insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) = 0;
-
- /// Appends a batch elements from other column with the same type
- /// indices_begin + indices_end represent the row indices of column src
- /// Warning:
- /// if *indices == 0 means the row is null, only use in outer join,
do not use in any other place
- virtual void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) = 0;
+ virtual void insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) = 0;
/// Appends data located in specified memory chunk if it is possible
(throws an exception if it cannot be implemented).
/// Is used to optimize some computations (in aggregation, for example).
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index 98fb480dd1a..c936c28b885 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -797,25 +797,10 @@ size_t ColumnArray::filter_nullable(const Filter& filter)
{
return result_size;
}
-void ColumnArray::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x == -1) {
- ColumnArray::insert_default();
- } else {
- ColumnArray::insert_from(src, *x);
- }
- }
-}
-
-void ColumnArray::insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x == 0) {
- ColumnArray::insert_default();
- } else {
- ColumnArray::insert_from(src, *x);
- }
+void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
+ ColumnArray::insert_from(src, *x);
}
}
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 95fd4633347..01ce3bcfe28 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -219,11 +219,8 @@ public:
callback(data);
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row =
0) override {
DCHECK(size() > self_row);
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index fb89740d852..fc8268ec791 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -184,33 +184,14 @@ public:
data.insert(data.end(), st, ed);
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
- const Self& src_vec = assert_cast<const Self&>(src);
- auto new_size = indices_end - indices_begin;
-
- for (int i = 0; i < new_size; ++i) {
- auto offset = *(indices_begin + i);
- if (offset == -1) {
- data.emplace_back(T {});
- } else {
- data.emplace_back(src_vec.get_element(offset));
- }
- }
- }
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
const Self& src_vec = assert_cast<const Self&>(src);
auto new_size = indices_end - indices_begin;
for (uint32_t i = 0; i < new_size; ++i) {
auto offset = *(indices_begin + i);
- if (offset == 0) {
- data.emplace_back(T {});
- } else {
- data.emplace_back(src_vec.get_element(offset));
- }
+ data.emplace_back(src_vec.get_element(offset));
}
}
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 280d2de8344..8d03087cc3d 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -111,13 +111,8 @@ public:
s += length;
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
- s += (indices_end - indices_begin);
- }
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
s += (indices_end - indices_begin);
}
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index b61753146fb..dfdfbb0d6b9 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -119,20 +119,8 @@ public:
data.push_back(assert_cast<const Self&>(src).get_data()[n]);
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
- auto origin_size = size();
- auto new_size = indices_end - indices_begin;
- data.resize(origin_size + new_size);
- const T* src_data = reinterpret_cast<const
T*>(src.get_raw_data().data);
-
- for (int i = 0; i < new_size; ++i) {
- data[origin_size + i] = src_data[indices_begin[i]];
- }
- }
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
auto origin_size = size();
auto new_size = indices_end - indices_begin;
data.resize(origin_size + new_size);
diff --git a/be/src/vec/columns/column_dictionary.h
b/be/src/vec/columns/column_dictionary.h
index d2374811e16..95238bf4f26 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -77,16 +77,11 @@ public:
LOG(FATAL) << "insert_range_from not supported in ColumnDictionary";
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary";
}
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
- LOG(FATAL) << "insert_indices_from_join not supported in
ColumnDictionary";
- }
-
void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported
in ColumnDictionary"; }
void update_hash_with_value(size_t n, SipHash& hash) const override {
diff --git a/be/src/vec/columns/column_dummy.h
b/be/src/vec/columns/column_dummy.h
index 790c135889e..a2d76ade562 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -80,8 +80,8 @@ public:
s += length;
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
s += (indices_end - indices_begin);
}
diff --git a/be/src/vec/columns/column_fixed_length_object.h
b/be/src/vec/columns/column_fixed_length_object.h
index e6b1db8bf5b..a817a4ae055 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -81,30 +81,8 @@ public:
return res;
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
- const Self& src_vec = assert_cast<const Self&>(src);
- auto origin_size = size();
- auto new_size = indices_end - indices_begin;
- if (_item_size == 0) {
- _item_size = src_vec._item_size;
- }
- DCHECK(_item_size == src_vec._item_size) << "dst and src should have
the same _item_size";
- resize(origin_size + new_size);
-
- for (int i = 0; i < new_size; ++i) {
- int offset = indices_begin[i];
- if (offset > -1) {
- memcpy(&_data[(origin_size + i) * _item_size],
&src_vec._data[offset * _item_size],
- _item_size);
- } else {
- memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
- }
- }
- }
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
const Self& src_vec = assert_cast<const Self&>(src);
auto origin_size = size();
auto new_size = indices_end - indices_begin;
@@ -115,13 +93,8 @@ public:
resize(origin_size + new_size);
for (uint32_t i = 0; i < new_size; ++i) {
- auto offset = indices_begin[i];
- if (offset) {
- memcpy(&_data[(origin_size + i) * _item_size],
&src_vec._data[offset * _item_size],
- _item_size);
- } else {
- memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
- }
+ memcpy(&_data[(origin_size + i) * _item_size],
+ &src_vec._data[indices_begin[i] * _item_size], _item_size);
}
}
diff --git a/be/src/vec/columns/column_map.cpp
b/be/src/vec/columns/column_map.cpp
index 82e8c0a9118..d4b64f8c163 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -185,25 +185,10 @@ void ColumnMap::insert_from(const IColumn& src_, size_t
n) {
get_offsets().push_back(get_offsets().back() + size);
}
-void ColumnMap::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x == -1) {
- ColumnMap::insert_default();
- } else {
- ColumnMap::insert_from(src, *x);
- }
- }
-}
-
-void ColumnMap::insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x == 0) {
- ColumnMap::insert_default();
- } else {
- ColumnMap::insert_from(src, *x);
- }
+void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
+ ColumnMap::insert_from(src, *x);
}
}
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 1cb3dd0c731..752de2e10c7 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -127,11 +127,9 @@ public:
Permutation& res) const override {
LOG(FATAL) << "get_permutation not implemented";
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const
override {
diff --git a/be/src/vec/columns/column_nothing.h
b/be/src/vec/columns/column_nothing.h
index 8874bb6e7ad..8a10eec8b6f 100644
--- a/be/src/vec/columns/column_nothing.h
+++ b/be/src/vec/columns/column_nothing.h
@@ -39,11 +39,6 @@ public:
bool structure_equals(const IColumn& rhs) const override {
return typeid(rhs) == typeid(ColumnNothing);
}
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
- LOG(FATAL) << "insert_indices_from_join not supported in
ColumnNothing";
- }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 3553e9823d9..ecf330bead3 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -294,8 +294,8 @@ void ColumnNullable::insert_range_from(const IColumn& src,
size_t start, size_t
_has_null |= simd::contain_byte(src_null_map_data.data() + start, length,
1);
}
-void ColumnNullable::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
+void ColumnNullable::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
get_nested_column().insert_indices_from(src_concrete.get_nested_column(),
indices_begin,
indices_end);
@@ -304,16 +304,6 @@ void ColumnNullable::insert_indices_from(const IColumn&
src, const int* indices_
_need_update_has_null = true;
}
-void ColumnNullable::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
- const uint32_t* indices_end) {
- const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
-
get_nested_column().insert_indices_from_join(src_concrete.get_nested_column(),
indices_begin,
- indices_end);
-
_get_null_map_column().insert_indices_from_join(src_concrete.get_null_map_column(),
- indices_begin,
indices_end);
- _need_update_has_null = true;
-}
-
void ColumnNullable::insert(const Field& x) {
if (x.is_null()) {
get_nested_column().insert_default();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 365400a6699..10b0951ab8b 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -121,10 +121,8 @@ public:
void deserialize_vec(std::vector<StringRef>& keys, size_t num_rows)
override;
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
void insert(const Field& x) override;
void insert_from(const IColumn& src, size_t n) override;
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 1dd4f7c74cd..78ff31a55db 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1422,20 +1422,9 @@ void
ColumnObject::append_data_by_selector(MutableColumnPtr& res,
return append_data_by_selector_impl<ColumnObject>(res, selector);
}
-void ColumnObject::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x == -1) {
- ColumnObject::insert_default();
- } else {
- ColumnObject::insert_from(src, *x);
- }
- }
-}
-
-void ColumnObject::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
- const uint32_t* indices_end) {
- for (auto x = indices_begin; x != indices_end; ++x) {
+void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
ColumnObject::insert_from(src, *x);
}
}
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 7e8be7e6d81..8efec7ad122 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -369,11 +369,8 @@ public:
void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const
override;
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
// May throw execption
void try_insert(const Field& field);
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index 2d009e2a08b..424a8717e14 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -124,10 +124,10 @@ void ColumnString::insert_range_from(const IColumn& src,
size_t start, size_t le
}
}
-void ColumnString::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- const ColumnString& src_str = assert_cast<const ColumnString&>(src);
- auto src_offset_data = src_str.offsets.data();
+void ColumnString::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ const auto& src_str = assert_cast<const ColumnString&>(src);
+ const auto* src_offset_data = src_str.offsets.data();
auto old_char_size = chars.size();
size_t total_chars_size = old_char_size;
@@ -136,65 +136,24 @@ void ColumnString::insert_indices_from(const IColumn&
src, const int* indices_be
offsets.resize(offsets.size() + indices_end - indices_begin);
auto* dst_offsets_data = offsets.data();
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x != -1) {
- total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
- }
- dst_offsets_data[dst_offsets_pos++] = total_chars_size;
- }
- check_chars_length(total_chars_size, offsets.size());
-
- chars.resize(total_chars_size);
-
- auto* src_data_ptr = src_str.chars.data();
- auto* dst_data_ptr = chars.data();
-
- size_t dst_chars_pos = old_char_size;
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x != -1) {
- const size_t size_to_append = src_offset_data[*x] -
src_offset_data[*x - 1];
- const size_t offset = src_offset_data[*x - 1];
- memcpy_small_allow_read_write_overflow15(dst_data_ptr +
dst_chars_pos,
- src_data_ptr + offset,
size_to_append);
- dst_chars_pos += size_to_append;
- }
- }
-}
-
-void ColumnString::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
- const uint32_t* indices_end) {
- const ColumnString& src_str = assert_cast<const ColumnString&>(src);
- auto src_offset_data = src_str.offsets.data();
-
- auto old_char_size = chars.size();
- size_t total_chars_size = old_char_size;
-
- auto dst_offsets_pos = offsets.size();
- offsets.resize(offsets.size() + indices_end - indices_begin);
- auto* dst_offsets_data = offsets.data();
-
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x != 0) {
- total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
- }
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
+ total_chars_size += src_offset_data[*x] - src_offset_data[int(*x) - 1];
dst_offsets_data[dst_offsets_pos++] = total_chars_size;
}
check_chars_length(total_chars_size, offsets.size());
chars.resize(total_chars_size);
- auto* src_data_ptr = src_str.chars.data();
+ const auto* src_data_ptr = src_str.chars.data();
auto* dst_data_ptr = chars.data();
size_t dst_chars_pos = old_char_size;
- for (auto x = indices_begin; x != indices_end; ++x) {
- if (*x != 0) {
- const size_t size_to_append = src_offset_data[*x] -
src_offset_data[*x - 1];
- const size_t offset = src_offset_data[*x - 1];
- memcpy_small_allow_read_write_overflow15(dst_data_ptr +
dst_chars_pos,
- src_data_ptr + offset,
size_to_append);
- dst_chars_pos += size_to_append;
- }
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
+ const size_t size_to_append = src_offset_data[*x] -
src_offset_data[int(*x) - 1];
+ const size_t offset = src_offset_data[int(*x) - 1];
+ memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos,
+ src_data_ptr + offset,
size_to_append);
+ dst_chars_pos += size_to_append;
}
}
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 191c6a95cf9..e6b27f20054 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -484,11 +484,8 @@ public:
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
size_t filter(const Filter& filter) override;
diff --git a/be/src/vec/columns/column_struct.cpp
b/be/src/vec/columns/column_struct.cpp
index 3502fdf581a..5a89b5d754c 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -225,23 +225,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t*
__restrict hash, PrimitiveTy
}
}
-void ColumnStruct::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
+void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ const auto& src_concrete = assert_cast<const ColumnStruct&>(src);
for (size_t i = 0; i < columns.size(); ++i) {
columns[i]->insert_indices_from(src_concrete.get_column(i),
indices_begin, indices_end);
}
}
-void ColumnStruct::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
- const uint32_t* indices_end) {
- const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
- for (size_t i = 0; i < columns.size(); ++i) {
- columns[i]->insert_indices_from_join(src_concrete.get_column(i),
indices_begin,
- indices_end);
- }
-}
-
void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t
length) {
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i) {
diff --git a/be/src/vec/columns/column_struct.h
b/be/src/vec/columns/column_struct.h
index 499fb8444f9..e2da7fd6440 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -121,11 +121,8 @@ public:
uint32_t offset = 0,
const uint8_t* __restrict null_data = nullptr)
const override;
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
-
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
Permutation& res) const override {
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index a825e07d5f2..65b1b6308ee 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -366,31 +366,8 @@ void ColumnVector<T>::insert_range_from(const IColumn&
src, size_t start, size_t
}
template <typename T>
-void ColumnVector<T>::insert_indices_from(const IColumn& src, const int*
indices_begin,
- const int* indices_end) {
- auto origin_size = size();
- auto new_size = indices_end - indices_begin;
- data.resize(origin_size + new_size);
-
- const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data);
-
- if constexpr (std::is_same_v<T, UInt8>) {
- // nullmap : indices_begin[i] == -1 means is null at the here, set
true here
- for (int i = 0; i < new_size; ++i) {
- data[origin_size + i] = (indices_begin[i] == -1) +
- (indices_begin[i] != -1) *
src_data[indices_begin[i]];
- }
- } else {
- // real data : indices_begin[i] == -1 what at is meaningless
- for (int i = 0; i < new_size; ++i) {
- data[origin_size + i] = src_data[indices_begin[i]];
- }
- }
-}
-
-template <typename T>
-void ColumnVector<T>::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
- const uint32_t* indices_end) {
+void ColumnVector<T>::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
auto origin_size = size();
auto new_size = indices_end - indices_begin;
data.resize(origin_size + new_size);
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index cb1edddb520..53199027585 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -386,11 +386,9 @@ public:
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override;
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override;
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override;
void fill(const value_type& element, size_t num) {
auto old_size = data.size();
auto new_size = old_size + num;
diff --git a/be/src/vec/columns/predicate_column.h
b/be/src/vec/columns/predicate_column.h
index 79f445b08d5..c42f0a33225 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -126,16 +126,11 @@ public:
LOG(FATAL) << "insert_range_from not supported in PredicateColumnType";
}
- void insert_indices_from(const IColumn& src, const int* indices_begin,
- const int* indices_end) override {
+ void insert_indices_from(const IColumn& src, const uint32_t* indices_begin,
+ const uint32_t* indices_end) override {
LOG(FATAL) << "insert_indices_from not supported in
PredicateColumnType";
}
- void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
- const uint32_t* indices_end) override {
- LOG(FATAL) << "insert_indices_from_join not supported in
PredicateColumnType";
- }
-
void pop_back(size_t n) override {
LOG(FATAL) << "pop_back not supported in PredicateColumnType";
}
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 723dc3ac639..195134d0293 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -950,7 +950,8 @@ void MutableBlock::add_row(const Block* block, int row) {
}
}
-void MutableBlock::add_rows(const Block* block, const int* row_begin, const
int* row_end) {
+void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin,
+ const uint32_t* row_end) {
DCHECK_LE(columns(), block->columns());
const auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 6c7fa80cb81..ec2cf249b2d 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -563,7 +563,7 @@ public:
void swap(MutableBlock&& other) noexcept;
void add_row(const Block* block, int row);
- void add_rows(const Block* block, const int* row_begin, const int*
row_end);
+ void add_rows(const Block* block, const uint32_t* row_begin, const
uint32_t* row_end);
void add_rows(const Block* block, size_t row_begin, size_t length);
void add_rows(const Block* block, std::vector<int64_t> rows);
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 38f8b3a558f..c8f2ae0e554 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -76,8 +76,8 @@ void ProcessHashTableProbe<JoinOpType,
Parent>::build_side_output_column(
for (int i = 0; i < _right_col_len; i++) {
const auto& column = *_build_block->safe_get_by_position(i).column;
if (output_slot_flags[i]) {
- mcol[i + _right_col_idx]->insert_indices_from_join(column,
_build_indexs.data(),
-
_build_indexs.data() + size);
+ mcol[i + _right_col_idx]->insert_indices_from(column,
_build_indexs.data(),
+
_build_indexs.data() + size);
} else {
mcol[i + _right_col_idx]->insert_many_defaults(size);
}
@@ -365,8 +365,8 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::process_data_in_hashtable(
}
for (size_t j = 0; j < _right_col_len; ++j) {
const auto& column = *_build_block->safe_get_by_position(j).column;
- mcol[j + _right_col_idx]->insert_indices_from_join(column,
_build_indexs.data(),
-
_build_indexs.data() + block_size);
+ mcol[j + _right_col_idx]->insert_indices_from(column,
_build_indexs.data(),
+ _build_indexs.data()
+ block_size);
}
// just resize the left table column in case with other conjunct to
make block size is not zero
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index a5305a4b530..f3214639856 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -409,7 +409,7 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
.data();
const auto num_rows = cur_block.rows();
- std::vector<int> selector(num_rows);
+ std::vector<uint32_t> selector(num_rows);
size_t selector_idx = 0;
for (size_t j = 0; j < num_rows; j++) {
if constexpr (IsSemi) {
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 8e4ab5c22bf..681fc09739a 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -126,8 +126,8 @@ public:
hashes[i] = hashes[i] % element_size;
}
- std::vector<int> channel2rows[element_size];
- for (int i = 0; i < rows; i++) {
+ std::vector<uint32_t> channel2rows[element_size];
+ for (uint32_t i = 0; i < rows; i++) {
channel2rows[hashes[i]].emplace_back(i);
}
@@ -234,10 +234,10 @@ private:
std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
- const std::vector<int>& rows) {
+ const std::vector<uint32_t>& rows) {
int row_wait_add = rows.size();
const int batch_size = _batch_size;
- const int* begin = &rows[0];
+ const uint32_t* begin = rows.data();
std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]);
while (row_wait_add > 0) {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 5bfec60d1f9..4d55e0fcb06 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -222,7 +222,7 @@ Status Channel<Parent>::send_remote_block(PBlock* block,
bool eos, Status exec_s
}
template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<int>& rows,
bool eos) {
+Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>&
rows, bool eos) {
if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
@@ -713,7 +713,7 @@ BlockSerializer<Parent>::BlockSerializer(Parent* parent,
bool is_local)
template <typename Parent>
Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock*
dest, int num_receivers,
bool* serialized, bool
eos,
- const std::vector<int>*
rows) {
+ const
std::vector<uint32_t>* rows) {
if (_mutable_block == nullptr) {
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
_mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -722,9 +722,9 @@ Status
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
{
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
if (rows) {
- if (rows->size() > 0) {
+ if (!rows->empty()) {
SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
- const int* begin = &(*rows)[0];
+ const auto* begin = rows->data();
_mutable_block->add_rows(block, begin, begin + rows->size());
}
} else if (!block->empty()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index d25385d6b21..75a3bfd86a6 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -78,7 +78,7 @@ class BlockSerializer {
public:
BlockSerializer(Parent* parent, bool is_local = true);
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers,
bool* serialized,
- bool eos, const std::vector<int>* rows =
nullptr);
+ bool eos, const std::vector<uint32_t>* rows =
nullptr);
Status serialize_block(PBlock* dest, int num_receivers = 1);
Status serialize_block(const Block* src, PBlock* dest, int num_receivers =
1);
@@ -277,7 +277,7 @@ public:
return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
}
- virtual Status add_rows(Block* block, const std::vector<int>& row, bool
eos);
+ virtual Status add_rows(Block* block, const std::vector<uint32_t>& row,
bool eos);
virtual Status send_current_block(bool eos, Status exec_status);
@@ -412,9 +412,9 @@ Status VDataStreamSender::channel_add_rows(RuntimeState*
state, Channels& channe
int num_channels,
const HashValueType* __restrict
channel_ids, int rows,
Block* block, bool eos) {
- std::vector<int> channel2rows[num_channels];
+ std::vector<uint32_t> channel2rows[num_channels];
- for (int i = 0; i < rows; i++) {
+ for (uint32_t i = 0; i < rows; i++) {
channel2rows[channel_ids[i]].emplace_back(i);
}
@@ -503,7 +503,7 @@ public:
return Status::OK();
}
- Status add_rows(Block* block, const std::vector<int>& rows, bool eos)
override {
+ Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
override {
if (Channel<Parent>::_fragment_instance_id.lo == -1) {
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 69918b37a6c..e2b069db3b6 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -93,7 +93,7 @@ using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
struct Rows {
int64_t partition_id;
int64_t index_id;
- std::vector<int32_t> row_idxes;
+ std::vector<uint32_t> row_idxes;
};
using RowsForTablet = std::unordered_map<int64_t, Rows>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]