This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 922134f3ce0e1d756532bb44e096f05c3c231aa5
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Thu Oct 26 13:40:52 2023 +0800

    update
    
    update
    
    update
    
    fix
    
    fix
    
    update
    
    fix
    
    update
---
 be/src/exprs/runtime_filter_slots.h                |  5 +-
 be/src/pipeline/pipeline_x/dependency.h            |  5 +-
 .../local_exchange_sink_operator.cpp               |  2 +-
 be/src/vec/columns/column_vector.cpp               | 13 +---
 be/src/vec/common/hash_table/hash_map.h            | 14 +++-
 be/src/vec/common/hash_table/hash_map_context.h    | 90 ++++++++++++++--------
 be/src/vec/core/block.cpp                          |  2 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  |  4 +-
 be/src/vec/exec/join/vhash_join_node.cpp           |  8 ++
 be/src/vec/exec/join/vhash_join_node.h             |  8 +-
 10 files changed, 91 insertions(+), 60 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 63c5665d271..0f841e5a60f 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -174,6 +174,7 @@ public:
                 auto column = it->get_by_position(result_column_id).column;
 
                 std::vector<int> indexs;
+                // indexs start from 1 because the first row is mocked for 
join hash map
                 if (const auto* nullable =
                             
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
                     column = nullable->get_nested_column_ptr();
@@ -181,14 +182,14 @@ public:
                                                       
nullable->get_null_map_column_ptr().get())
                                                       ->get_data()
                                                       .data();
-                    for (int i = 0; i < column->size(); i++) {
+                    for (int i = 1; i < column->size(); i++) {
                         if (null_map[i]) {
                             continue;
                         }
                         indexs.push_back(i);
                     }
                 } else {
-                    for (int i = 0; i < column->size(); i++) {
+                    for (int i = 1; i < column->size(); i++) {
                         indexs.push_back(i);
                     }
                 }
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 290df96cbad..e7f6e3689fe 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -839,8 +839,9 @@ private:
     bool is_set_probe {false};
 };
 
-using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
-                                   
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
+using PartitionedBlock =
+        std::pair<std::shared_ptr<vectorized::Block>,
+                  std::tuple<std::shared_ptr<std::vector<uint32_t>>, size_t, 
size_t>>;
 struct LocalExchangeSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index b69150ba512..7b45a600ac5 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -37,7 +37,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* 
state,
     auto& data_queue = _shared_state->data_queue;
     const auto num_partitions = data_queue.size();
     const auto rows = block->rows();
-    auto row_idx = std::make_shared<std::vector<int>>(rows);
+    auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
     {
         _partition_rows_histogram.assign(num_partitions + 1, 0);
         for (size_t i = 0; i < rows; ++i) {
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index c4ca97df7a2..69c09dcde80 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -375,17 +375,8 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src,
 
     const T* src_data = reinterpret_cast<const T*>(src.get_raw_data().data);
 
-    if constexpr (std::is_same_v<T, UInt8>) {
-        // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
-        for (int i = 0; i < new_size; ++i) {
-            data[origin_size + i] =
-                    (indices_begin[i] == 0) + (indices_begin[i] != 0) * 
src_data[indices_begin[i]];
-        }
-    } else {
-        // real data : indices_begin[i] == 0 what at is meaningless
-        for (int i = 0; i < new_size; ++i) {
-            data[origin_size + i] = src_data[indices_begin[i]];
-        }
+    for (int i = 0; i < new_size; ++i) {
+        data[origin_size + i] = src_data[indices_begin[i]];
     }
 }
 
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 1c132fc99ed..dfb01556fc6 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -220,7 +220,7 @@ public:
     void prepare_build(size_t num_elem, int batch_size) {
         max_batch_size = batch_size;
         bucket_size = calc_bucket_size(num_elem + 1);
-        first.resize(bucket_size, 0);
+        first.resize(bucket_size + 1, 0);
         next.resize(num_elem);
 
         if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
@@ -238,6 +238,7 @@ public:
             next[i] = first[bucket_nums[i]];
             first[bucket_nums[i]] = i;
         }
+        first[bucket_size] = 0;
     }
 
     template <int JoinOpType>
@@ -275,7 +276,7 @@ public:
         while (count < batch_size && iter_idx < elem_num) {
             const auto matched = visited[iter_idx];
             build_idxs[count] = iter_idx;
-            if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN) {
+            if constexpr (JoinOpType != doris::TJoinOp::RIGHT_ANTI_JOIN) {
                 count += !matched;
             } else {
                 count += matched;
@@ -336,6 +337,10 @@ private:
         uint32_t build_idx = 0;
         const auto batch_size = max_batch_size;
 
+        if (!build_keys) {
+            probe_idx = probe_rows;
+        }
+
         auto do_the_probe = [&]() {
             while (build_idx && LIKELY(matched_cnt < batch_size)) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
@@ -353,7 +358,7 @@ private:
             if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                           JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
                 // `(!matched_cnt || probe_idxs[matched_cnt - 1] != 
probe_idx)` means not match one build side
-                if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] 
!= probe_idx)) {
+                if (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = 0;
                     matched_cnt++;
@@ -381,7 +386,8 @@ private:
         return std::pair {probe_idx, matched_cnt};
     }
 
-    const Key* __restrict build_keys;
+    const Key* __restrict build_keys = nullptr;
+
     std::vector<uint8_t> visited;
 
     int max_batch_size = 0;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 549301ae477..87014787895 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -72,11 +72,14 @@ struct MethodBase {
                                       const uint8_t* null_map = nullptr, bool 
is_build = false) = 0;
 
     void init_hash_values(size_t num_rows, const uint8_t* null_map) {
+        hash_values.resize(num_rows);
         if (null_map == nullptr) {
-            init_hash_values(num_rows);
+            for (size_t k = 0; k < num_rows; ++k) {
+                hash_values[k] = hash_table->hash(keys[k]);
+            }
             return;
         }
-        hash_values.resize(num_rows);
+
         for (size_t k = 0; k < num_rows; ++k) {
             if (null_map[k]) {
                 continue;
@@ -85,20 +88,6 @@ struct MethodBase {
         }
     }
 
-    void init_hash_values(size_t num_rows) {
-        hash_values.resize(num_rows);
-        for (size_t k = 0; k < num_rows; ++k) {
-            hash_values[k] = hash_table->hash(keys[k]);
-        }
-    }
-
-    void calculate_bucket(size_t num_rows) {
-        size_t mask = hash_table->get_bucket_mask();
-        for (size_t i = 0; i < num_rows; i++) {
-            hash_values[i] &= mask;
-        }
-    }
-
     template <bool read>
     void prefetch(int current) {
         if (LIKELY(current + HASH_MAP_PREFETCH_DIST < hash_values.size())) {
@@ -153,6 +142,10 @@ struct MethodSerialized : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, 
typename Base::Mapped>;
     using Base::try_presis_key;
 
+    // need keep until the hash probe end.
+    std::vector<StringRef> build_stored_keys;
+    Arena build_arena;
+    // refresh each time probe
     std::vector<StringRef> stored_keys;
 
     StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
@@ -167,10 +160,10 @@ struct MethodSerialized : public MethodBase<TData> {
         return {begin, sum_size};
     }
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
-        Base::arena.clear();
-        stored_keys.resize(num_rows);
+    void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                                   std::vector<StringRef>& keys, Arena& arena) 
{
+        arena.clear();
+        keys.resize(num_rows);
 
         size_t max_one_row_byte_size = 0;
         for (const auto& column : key_columns) {
@@ -182,25 +175,31 @@ struct MethodSerialized : public MethodBase<TData> {
             // reach mem limit, don't serialize in batch
             size_t keys_size = key_columns.size();
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i] =
-                        serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, Base::arena);
+                keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, arena);
             }
         } else {
-            auto* serialized_key_buffer =
-                    reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes));
+            auto* serialized_key_buffer = 
reinterpret_cast<uint8_t*>(arena.alloc(total_bytes));
 
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i].data =
+                keys[i].data =
                         reinterpret_cast<char*>(serialized_key_buffer + i * 
max_one_row_byte_size);
-                stored_keys[i].size = 0;
+                keys[i].size = 0;
             }
 
             for (const auto& column : key_columns) {
-                column->serialize_vec(stored_keys, num_rows, 
max_one_row_byte_size);
+                column->serialize_vec(keys, num_rows, max_one_row_byte_size);
             }
         }
-        Base::keys = stored_keys.data();
-        Base::init_hash_values(num_rows, null_map);
+        Base::keys = keys.data();
+    }
+
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
+        init_serialized_keys_impl(key_columns, num_rows, is_build ? 
build_stored_keys : stored_keys,
+                                  is_build ? build_arena : Base::arena);
+        if (!is_build) {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
@@ -241,7 +240,9 @@ struct MethodStringNoCache : public MethodBase<TData> {
         }
 
         Base::keys = stored_keys.data();
-        Base::init_hash_values(num_rows, null_map);
+        if (!is_build) {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
@@ -270,7 +271,9 @@ struct MethodOneNumber : public MethodBase<TData> {
                              ->get_raw_data()
                              .data;
         std::string name = key_columns[0]->get_name();
-        Base::init_hash_values(num_rows, null_map);
+        if (!is_build) {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -392,7 +395,9 @@ struct MethodKeysFixed : public MethodBase<TData> {
             stored_keys = pack_fixeds<Key>(num_rows, actual_columns, 
null_maps);
             Base::keys = stored_keys.data();
         }
-        Base::init_hash_values(num_rows, null_map);
+        if (!is_build) {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -510,4 +515,25 @@ template <class Key, bool has_null, typename Value>
 using FixedKeyHashTableContext =
         MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>, 
has_null>;
 
+template <typename HashTableContext>
+void init_bucket_num(HashTableContext& ctx, size_t num_rows, const uint8_t* 
null_map) {
+    ctx.hash_values.resize(num_rows);
+    if (null_map == nullptr) {
+        size_t mask = ctx.hash_table->get_bucket_mask();
+        for (size_t k = 0; k < num_rows; ++k) {
+            ctx.hash_values[k] = ctx.hash_table->hash(ctx.keys[k]) & mask;
+        }
+        return;
+    }
+
+    size_t mask = ctx.hash_table->get_bucket_mask();
+    for (size_t k = 0; k < num_rows; ++k) {
+        if (null_map[k]) {
+            ctx.hash_values[k] = mask + 1;
+        } else {
+            ctx.hash_values[k] = ctx.hash_table->hash(ctx.keys[k]) & mask;
+        }
+    }
+}
+
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 931501d024b..5dbaad805f6 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -1066,7 +1066,7 @@ std::unique_ptr<Block> 
Block::create_same_struct_block(size_t size, bool is_rese
         if (is_reserve) {
             column->reserve(size);
         } else {
-            column->resize(size);
+            column->insert_many_defaults(size);
         }
         temp_block->insert({std::move(column), d.type, d.name});
     }
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index a267a96d55f..c0e8c351f46 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -68,7 +68,7 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
     constexpr auto probe_all =
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
-    if (!is_semi_anti_join || have_other_join_conjunct) {
+    if ((!is_semi_anti_join || have_other_join_conjunct) && size) {
         for (int i = 0; i < _right_col_len; i++) {
             const auto& column = *_build_block->safe_get_by_position(i).column;
             if (output_slot_flags[i]) {
@@ -134,7 +134,7 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
         _parent->_ready_probe = true;
         hash_table_ctx.reset();
         hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map);
-        hash_table_ctx.calculate_bucket(probe_rows);
+        init_bucket_num(hash_table_ctx,probe_rows, null_map);
     }
     return typename HashTableType::State(_parent->_probe_columns);
 }
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9e8c99c8d56..747bcfdc02c 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -955,6 +955,14 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block) {
     RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, 
*_build_expr_call_timer, res_col_ids));
     if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
         _convert_block_to_null(block);
+        // first row is mocked
+        for (int i = 0; i < block.columns(); i++) {
+            assert_cast<ColumnNullable*>(
+                    
(*std::move(block.safe_get_by_position(i).column)).mutate().get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .data()[0] = 1;
+        }
     }
     // TODO: Now we are not sure whether a column is nullable only by 
ExecNode's `row_desc`
     //  so we have to initialize this flag by the first build block.
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 6fb1703120e..3bcdbfae3ba 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -87,10 +87,8 @@ struct ProcessRuntimeFilterBuild {
                 state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
 
         if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
-            {
-                SCOPED_TIMER(parent->_push_compute_timer);
-                
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
-            }
+            SCOPED_TIMER(parent->_push_compute_timer);
+            parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
         }
         {
             SCOPED_TIMER(parent->_push_down_timer);
@@ -137,7 +135,7 @@ struct ProcessHashTableBuild {
 
         hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
                                             null_map ? null_map->data() : 
nullptr, true);
-        hash_table_ctx.calculate_bucket(_rows);
+        init_bucket_num(hash_table_ctx, _rows, null_map ? null_map->data() : 
nullptr);
         hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
                                          _rows);
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to