Gabriel39 commented on code in PR #12586:
URL: https://github.com/apache/doris/pull/12586#discussion_r970787052
##########
be/src/vec/sink/vdata_stream_sender.cpp:
##########
@@ -508,45 +512,49 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block) {
// vectorized calculate hash
int rows = block->rows();
- // for each row, we have a siphash val
- std::vector<SipHash> siphashs(rows);
- // result[j] means column index, i means rows index
- for (int j = 0; j < result_size; ++j) {
-
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
- }
-
- // channel2rows' subscript means channel id
- std::vector<vectorized::UInt64> hash_vals(rows);
- for (int i = 0; i < rows; i++) {
- hash_vals[i] = siphashs[i].get64();
- }
+ auto element_size = _channels.size();
+ std::vector<uint64_t> hash_vals(rows);
+ auto* __restrict hashes = hash_vals.data();
+
+ // TODO: after we support new shuffle hash method, should simple the
code
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ if (!_new_shuffle_hash_method) {
+ // for each row, we have a siphash val
+ std::vector<SipHash> siphashs(rows);
+ // result[j] means column index, i means rows index
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
+ }
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = siphashs[i].get64() % element_size;
+ }
+ } else {
+ // result[j] means column index, i means rows index, here to
calculate the xxhash value
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
+ }
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(),
hash_vals, rows, block));
- } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- // will only copy schema
- // we don't want send temp columns
- auto column_to_keep = block->columns();
- // 1. calculate hash
- // 2. dispatch rows to channel
- int result_size = _partition_expr_ctxs.size();
- int result[result_size];
- RETURN_IF_ERROR(get_partition_column_result(block, result));
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
+ }
+ }
- // vectorized calculate hash val
- int rows = block->rows();
- // for each row, we have a hash_val
- std::vector<uint32_t> hash_vals(rows);
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes,
rows, block));
+ } else {
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_crcs_with_value(
+ hash_vals,
_partition_expr_ctxs[j]->root()->type().type);
+ }
+ element_size = _channel_shared_ptrs.size();
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
Review Comment:
```suggestion
hashes[i] = hash_vals[i] % element_size;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]