This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit bb20da4b3c3695fece584fdea286e47c7496d469
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 31 23:00:07 2022 +0800

    [improvement](join) Support join project in query engine (#8722)
---
 be/src/vec/columns/column_string.cpp     |   7 +-
 be/src/vec/exec/join/vhash_join_node.cpp | 151 ++++++++++++++++++++-----------
 be/src/vec/exec/join/vhash_join_node.h   |   4 +
 gensrc/thrift/PlanNodes.thrift           |   6 ++
 4 files changed, 112 insertions(+), 56 deletions(-)

diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 832bbbe..98ebf67 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -331,7 +331,12 @@ void ColumnString::reserve(size_t n) {
 }
 
 void ColumnString::resize(size_t n) {
-    offsets.resize(n);
+    auto origin_size = size();
+    if (origin_size > n) {
+        offsets.resize(n);
+    } else if (origin_size < n) {
+        insert_many_defaults(n - origin_size);
+    }
 }
 
 void ColumnString::get_extremes(Field& min, Field& max) const {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 628ea91..9335028 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -172,7 +172,9 @@ struct ProcessHashTableProbe {
               _probe_side_output_timer(join_node->_probe_side_output_timer) {}
 
     // output build side result column
-    void build_side_output_column(MutableColumns& mcol, int column_offset, int 
column_length, int size) {
+    template <bool have_other_join_conjunct = false>
+    void build_side_output_column(MutableColumns& mcol, int column_offset, int 
column_length,
+            const std::vector<bool>& output_slot_flags, int size) {
         constexpr auto is_semi_anti_join = JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
                                            JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN ||
                                            JoinOpType::value == 
TJoinOp::LEFT_ANTI_JOIN ||
@@ -181,28 +183,37 @@ struct ProcessHashTableProbe {
         constexpr auto probe_all = JoinOpType::value == 
TJoinOp::LEFT_OUTER_JOIN ||
                                    JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN;
 
-        if constexpr (!is_semi_anti_join) {
+        if constexpr (!is_semi_anti_join || have_other_join_conjunct) {
             if (_build_blocks.size() == 1) {
                 for (int i = 0; i < column_length; i++) {
                     auto& column = *_build_blocks[0].get_by_position(i).column;
-                    mcol[i + column_offset]->insert_indices_from(column,
-                            _build_block_rows.data(), _build_block_rows.data() 
+ size);
+                    if (output_slot_flags[i]) {
+                        mcol[i + column_offset]->insert_indices_from(column, 
_build_block_rows.data(),
+                                                                     
_build_block_rows.data() + size);
+                    } else {
+                        mcol[i + column_offset]->resize(size);
+                    }
                 }
             } else {
                 for (int i = 0; i < column_length; i++) {
-                    for (int j = 0; j < size; j++) {
-                        if constexpr (probe_all) {
-                            if (_build_block_offsets[j] == -1) {
-                                DCHECK(mcol[i + column_offset]->is_nullable());
-                                assert_cast<ColumnNullable *>(mcol[i + 
column_offset].get())->insert_join_null_data();
+                    if (output_slot_flags[i]) {
+                        for (int j = 0; j < size; j++) {
+                            if constexpr (probe_all) {
+                                if (_build_block_offsets[j] == -1) {
+                                    DCHECK(mcol[i + 
column_offset]->is_nullable());
+                                    assert_cast<ColumnNullable *>(
+                                            mcol[i + 
column_offset].get())->insert_join_null_data();
+                                } else {
+                                    auto &column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
+                                    mcol[i + 
column_offset]->insert_from(column, _build_block_rows[j]);
+                                }
                             } else {
-                                auto& column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
+                                auto &column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
                                 mcol[i + column_offset]->insert_from(column, 
_build_block_rows[j]);
                             }
-                        } else {
-                            auto& column = 
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
-                            mcol[i + column_offset]->insert_from(column, 
_build_block_rows[j]);
                         }
+                    } else {
+                        mcol[i + column_offset]->resize(size);
                     }
                 }
             }
@@ -210,10 +221,14 @@ struct ProcessHashTableProbe {
     }
 
     // output probe side result column
-    void probe_side_output_column(MutableColumns& mcol, int column_length, int 
size) {
-        for (int i = 0; i < column_length; ++i) {
-            auto& column = _probe_block.get_by_position(i).column;
-            column->replicate(&_items_counts[0], size, *mcol[i]);
+    void probe_side_output_column(MutableColumns& mcol, const 
std::vector<bool>& output_slot_flags, int size) {
+        for (int i = 0; i < output_slot_flags.size(); ++i) {
+            if (output_slot_flags[i]) {
+                auto& column = _probe_block.get_by_position(i).column;
+                column->replicate(&_items_counts[0], size, *mcol[i]);
+            } else {
+                mcol[i]->resize(size);
+            }
         }
     }
     // Only process the join with no other join conjunt, because of no other 
join conjunt
@@ -325,12 +340,13 @@ struct ProcessHashTableProbe {
 
         {
             SCOPED_TIMER(_build_side_output_timer);
-            build_side_output_column(mcol, right_col_idx, right_col_len, 
current_offset);
+            build_side_output_column(mcol, right_col_idx, right_col_len,
+                    _join_node->_right_output_slot_flags, current_offset);
         }
 
-        {
+        if constexpr (JoinOpType::value != TJoinOp::RIGHT_SEMI_JOIN && 
JoinOpType::value != TJoinOp::RIGHT_ANTI_JOIN) {
             SCOPED_TIMER(_probe_side_output_timer);
-            probe_side_output_column(mcol, right_col_idx, current_offset);
+            probe_side_output_column(mcol, 
_join_node->_left_output_slot_flags, current_offset);
         }
 
         output_block->swap(mutable_block.to_block());
@@ -351,10 +367,7 @@ struct ProcessHashTableProbe {
         int right_col_idx = _join_node->_left_table_data_types.size();
         int right_col_len = _join_node->_right_table_data_types.size();
 
-        IColumn::Offsets offset_data;
         auto& mcol = mutable_block.mutable_columns();
-        offset_data.assign(_probe_rows, (uint32_t)0);
-
         // use in right join to change visited state after
         // exec the vother join conjunt
         std::vector<bool*> visited_map;
@@ -363,16 +376,23 @@ struct ProcessHashTableProbe {
         std::vector<bool> same_to_prev;
         same_to_prev.reserve(1.2 * _batch_size);
 
+        _items_counts.resize(_probe_rows);
+        _build_block_offsets.resize(_batch_size);
+        _build_block_rows.resize(_batch_size);
+        memset(_items_counts.data(), 0, sizeof(uint32_t) * _probe_rows);
+
         int current_offset = 0;
 
         for (; _probe_index < _probe_rows;) {
             // ignore null rows
             if constexpr (ignore_null) {
                 if ((*null_map)[_probe_index]) {
-                    offset_data[_probe_index++] = current_offset;
+                    _items_counts[_probe_index++] = (uint32_t)0;
                     continue;
                 }
             }
+
+            auto last_offset = current_offset;
             auto find_result =
                     (*null_map)[_probe_index]
                             ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index,
@@ -382,15 +402,25 @@ struct ProcessHashTableProbe {
             if (find_result.is_found()) {
                 auto& mapped = find_result.get_mapped();
                 auto origin_offset = current_offset;
-
-                for (auto it = mapped.begin(); it.ok(); ++it) {
+                // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
+                // We should rethink whether to use this iterator mode in the 
future. Now just opt the one row case
+                if (mapped.get_row_count() == 1) {
+                    _build_block_offsets[current_offset] = mapped.block_offset;
+                    _build_block_rows[current_offset] = mapped.row_num;
                     ++current_offset;
-                    const Block& cur_blk = _build_blocks[it->block_offset];
-                    for (size_t j = 0; j < right_col_len; ++j) {
-                        auto& column = *cur_blk.get_by_position(j).column;
-                        mcol[j + right_col_idx]->insert_from(column, 
it->row_num);
+                    visited_map.emplace_back(&mapped.visited);
+                } else {
+                    for (auto it = mapped.begin(); it.ok(); ++it) {
+                        if (current_offset < _batch_size) {
+                            _build_block_offsets[current_offset] = 
it->block_offset;
+                            _build_block_rows[current_offset] = it->row_num;
+                        } else {
+                            
_build_block_offsets.emplace_back(it->block_offset);
+                            _build_block_rows.emplace_back(it->row_num);
+                        }
+                        ++current_offset;
+                        visited_map.emplace_back(&it->visited);
                     }
-                    visited_map.emplace_back(&it->visited);
                 }
                 same_to_prev.emplace_back(false);
                 for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
@@ -399,43 +429,36 @@ struct ProcessHashTableProbe {
             } else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN 
||
                                  JoinOpType::value == TJoinOp::FULL_OUTER_JOIN 
||
                                  JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) 
{
-                ++current_offset;
                 same_to_prev.emplace_back(false);
                 visited_map.emplace_back(nullptr);
                 // only full outer / left outer need insert the data of right 
table
-                if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
-                              JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) {
-                    for (size_t j = 0; j < right_col_len; ++j) {
-                        DCHECK(mcol[j + right_col_idx]->is_nullable());
-                        assert_cast<ColumnNullable *>(mcol[j + 
right_col_idx].get())->insert_join_null_data();
-                    }
-                } else {
-                    for (size_t j = 0; j < right_col_len; ++j) {
-                        mcol[j + right_col_idx]->insert_default();
-                    }
-                }
+                // left anti use -1 use a default value
+                _build_block_offsets[current_offset] = -1;
+                _build_block_rows[current_offset] = -1;
+                ++current_offset;
             } else {
                 // other join, no nothing
             }
 
-            offset_data[_probe_index++] = current_offset;
-
+            _items_counts[_probe_index++] = (uint32_t)(current_offset - 
last_offset);
             if (current_offset >= _batch_size) {
                 break;
             }
         }
 
-        for (int i = _probe_index; i < _probe_rows; ++i) {
-            offset_data[i] = current_offset;
+        {
+            SCOPED_TIMER(_build_side_output_timer);
+            build_side_output_column<true>(mcol, right_col_idx, right_col_len,
+                    _join_node->_right_output_slot_flags, current_offset);
         }
-
-        output_block->swap(mutable_block.to_block());
-        for (int i = 0; i < right_col_idx; ++i) {
-            auto& column = _probe_block.get_by_position(i).column;
-            output_block->get_by_position(i).column = 
column->replicate(offset_data);
+        {
+            SCOPED_TIMER(_probe_side_output_timer);
+            probe_side_output_column(mcol, 
_join_node->_left_output_slot_flags, current_offset);
         }
+        output_block->swap(mutable_block.to_block());
 
-        if (_join_node->_vother_join_conjunct_ptr) {
+        // dispose the other join conjunt exec
+        {
             int result_column_id = -1;
             int orig_columns = output_block->columns();
             (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, 
&result_column_id);
@@ -613,7 +636,6 @@ private:
     ProfileCounter* _probe_side_output_timer;
 };
 
-// now we only support inner join
 HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
           _join_op(tnode.hash_join_node.join_op),
@@ -627,7 +649,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const 
TPlanNode& tnode, const Descr
           _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN),
           _is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
                               _join_op == TJoinOp::RIGHT_SEMI_JOIN),
-          _is_outer_join(_match_all_build || _match_all_probe) {
+          _is_outer_join(_match_all_build || _match_all_probe),
+          
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? 
tnode.hash_join_node.hash_output_slot_ids :
+                                std::vector<SlotId>{}) {
     _runtime_filter_descs = tnode.runtime_filters;
     init_join_op();
 
@@ -705,6 +729,23 @@ Status HashJoinNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
                                                                    
filter_desc, state->query_options()));
     }
 
+    // init left/right output slots flags, only column of slot_id in 
_hash_output_slot_ids need
+    // insert to output block of hash join.
+    // _left_output_slots_flags : column of left table need to output set flag 
= true
+    // _rgiht_output_slots_flags : column of right table need to output set 
flag = true
+    // if _hash_output_slot_ids is empty, means all column of left/right table 
need to output.
+    auto init_output_slots_flags = [this](auto& tuple_descs, auto& 
output_slot_flags) {
+        for (const auto& tuple_desc : tuple_descs) {
+            for (const auto& slot_desc : tuple_desc->slots()) {
+                output_slot_flags.emplace_back(_hash_output_slot_ids.empty() ||
+                                               
std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(),
+                                                         slot_desc->id()) != 
_hash_output_slot_ids.end());
+            }
+        }
+    };
+    init_output_slots_flags(child(0)->row_desc().tuple_descriptors(), 
_left_output_slot_flags);
+    init_output_slots_flags(child(1)->row_desc().tuple_descriptors(), 
_right_output_slot_flags);
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index ca93aaa..e2762e6 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -230,6 +230,10 @@ private:
     std::vector<uint32_t> _items_counts;
     std::vector<int8_t> _build_block_offsets;
     std::vector<int> _build_block_rows;
+
+    std::vector<SlotId> _hash_output_slot_ids;
+    std::vector<bool> _left_output_slot_flags;
+    std::vector<bool> _right_output_slot_flags;
 private:
     Status _hash_table_build(RuntimeState* state);
     Status _process_build_block(RuntimeState* state, Block& block, uint8_t 
offset);
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0fb74a5..ef599e4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -394,6 +394,9 @@ struct THashJoinNode {
   // anything from the ON or USING clauses (but *not* the WHERE clause) that's 
not an
   // equi-join predicate, only use in vec exec engine
   5: optional Exprs.TExpr vother_join_conjunct 
+
+  // hash output column
+  6: optional list<Types.TSlotId> hash_output_slot_ids
 }
 
 struct TMergeJoinNode {
@@ -789,6 +792,9 @@ struct TPlanNode {
   40: optional Exprs.TExpr vconjunct
 
   41: optional TTableFunctionNode table_function_node
+
+  // output column
+  42: optional list<Types.TSlotId> output_slot_ids
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to