This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b826b66d2 [fix](mark join) mark join column should be nullable 
(#24910)
f5b826b66d2 is described below

commit f5b826b66d2eeceb8e54874ec80c705c3e4505cb
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Oct 9 21:10:36 2023 -0500

    [fix](mark join) mark join column should be nullable (#24910)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 13 +++---
 be/src/pipeline/exec/join_build_sink_operator.h    |  6 +--
 be/src/pipeline/pipeline_x/dependency.h            |  2 +-
 be/src/vec/columns/column_filter_helper.cpp        | 46 ++++++++++++++++++++++
 be/src/vec/columns/column_filter_helper.h          | 39 ++++++++++++++++++
 .../vec/exec/join/process_hash_table_probe_impl.h  | 30 +++++++++-----
 be/src/vec/exec/join/vhash_join_node.cpp           | 30 +++++++-------
 be/src/vec/exec/join/vhash_join_node.h             |  8 +++-
 be/src/vec/exec/join/vjoin_node_base.cpp           |  8 ++--
 be/src/vec/exec/join/vjoin_node_base.h             |  8 ++--
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 25 +++++-------
 .../trees/expressions/MarkJoinSlotReference.java   |  6 +--
 .../org/apache/doris/nereids/util/JoinUtils.java   |  3 +-
 .../nereids_syntax_p0/sub_query_correlated.out     | 15 +++++++
 .../nereids_tpcds_shape_sf100_p0/shape/query10.out | 32 +++++++--------
 .../nereids_tpcds_shape_sf100_p0/shape/query35.out | 32 +++++++--------
 .../nereids_syntax_p0/sub_query_correlated.groovy  | 38 +++++++++++++++++-
 17 files changed, 244 insertions(+), 97 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index da262645bb6..8dd84dfd27b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -139,8 +139,7 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->short_circuit_for_probe =
-            (_short_circuit_for_null_in_probe_side &&
-             p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
+            (_has_null_in_build_side && p._join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
             (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::INNER_JOIN &&
              !p._is_mark_join) ||
             (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
@@ -204,7 +203,7 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                         has_null_value || 
short_circuit_for_null_in_build_side
                                                 ? &null_map_val->get_data()
                                                 : nullptr,
-                                        
&_short_circuit_for_null_in_probe_side);
+                                        &_has_null_in_build_side);
                     }},
             *_shared_state->hash_table_variants,
             vectorized::make_bool_variant(_build_side_ignore_null),
@@ -453,8 +452,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     // make one block for each 4 gigabytes
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
-    if (local_state._short_circuit_for_null_in_probe_side) {
-        // TODO: if _short_circuit_for_null_in_probe_side is true we should 
finish current pipeline task.
+    if (local_state._has_null_in_build_side) {
+        // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
         return Status::OK();
     }
@@ -539,7 +538,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             _shared_hash_table_context->hash_table_variants =
                     local_state._shared_state->hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
-                    local_state._short_circuit_for_null_in_probe_side;
+                    local_state._has_null_in_build_side;
             if (local_state._runtime_filter_slots) {
                 local_state._runtime_filter_slots->copy_to_shared_context(
                         _shared_hash_table_context);
@@ -557,7 +556,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
                 
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
-        local_state._short_circuit_for_null_in_probe_side =
+        local_state._has_null_in_build_side =
                 
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
         local_state._shared_state->hash_table_variants =
                 std::static_pointer_cast<vectorized::HashTableVariants>(
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index a67c724f698..2f7a3ec03ea 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -40,7 +40,7 @@ protected:
     template <typename LocalStateType>
     friend class JoinBuildSinkOperatorX;
 
-    bool _short_circuit_for_null_in_probe_side = false;
+    bool _has_null_in_build_side = false;
 
     RuntimeProfile::Counter* _build_rows_counter;
     RuntimeProfile::Counter* _push_down_timer;
@@ -73,8 +73,8 @@ protected:
 
     // For null aware left anti join, we apply a short circuit strategy.
     // 1. Set _short_circuit_for_null_in_build_side to true if join operator 
is null aware left anti join.
-    // 2. In build phase, we stop materialize build side when we meet the 
first null value and set _short_circuit_for_null_in_probe_side to true.
-    // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, 
join node returns empty block directly. Otherwise, probing will continue as the 
same as generic left anti join.
+    // 2. In build phase, we stop materialize build side when we meet the 
first null value and set _has_null_in_build_side to true.
+    // 3. In probe phase, if _has_null_in_build_side is true, join node 
returns empty block directly. Otherwise, probing will continue as the same as 
generic left anti join.
     const bool _short_circuit_for_null_in_build_side;
 };
 
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index c69b49870d5..575b305b017 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -504,7 +504,7 @@ private:
 
 struct JoinSharedState {
     // For some join case, we can apply a short circuit strategy
-    // 1. _short_circuit_for_null_in_probe_side = true
+    // 1. _has_null_in_build_side = true
     // 2. build side rows is empty, Join op is: inner join/right outer 
join/left semi/right semi/right anti
     bool short_circuit_for_probe = false;
     vectorized::JoinOpVariants join_op_variants;
diff --git a/be/src/vec/columns/column_filter_helper.cpp 
b/be/src/vec/columns/column_filter_helper.cpp
new file mode 100644
index 00000000000..f65bd8d8649
--- /dev/null
+++ b/be/src/vec/columns/column_filter_helper.cpp
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/columns/column_filter_helper.h"
+
+namespace doris::vectorized {
+ColumnFilterHelper::ColumnFilterHelper(IColumn& column_)
+        : _column(assert_cast<ColumnNullable&>(column_)),
+          
_value_column(assert_cast<ColumnUInt8&>(_column.get_nested_column())),
+          _null_map_column(_column.get_null_map_column()) {}
+
+void ColumnFilterHelper::resize_fill(size_t size, doris::vectorized::UInt8 
value) {
+    _value_column.get_data().resize_fill(size, value);
+    _null_map_column.get_data().resize_fill(size, 0);
+}
+
+void ColumnFilterHelper::insert_value(doris::vectorized::UInt8 value) {
+    _value_column.get_data().push_back(value);
+    _null_map_column.get_data().push_back(0);
+}
+
+void ColumnFilterHelper::insert_null() {
+    _value_column.insert_default();
+    _null_map_column.get_data().push_back(1);
+}
+
+void ColumnFilterHelper::reserve(size_t size) {
+    _value_column.reserve(size);
+    _null_map_column.reserve(size);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/columns/column_filter_helper.h 
b/be/src/vec/columns/column_filter_helper.h
new file mode 100644
index 00000000000..2dc529ef3b4
--- /dev/null
+++ b/be/src/vec/columns/column_filter_helper.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "column_nullable.h"
+
+namespace doris::vectorized {
+class ColumnFilterHelper {
+public:
+    ColumnFilterHelper(IColumn&);
+
+    void resize_fill(size_t size, UInt8 value);
+    void insert_null();
+    void insert_value(UInt8 value);
+    void reserve(size_t size);
+
+    [[nodiscard]] size_t size() const { return _column.size(); }
+
+private:
+    ColumnNullable& _column;
+    ColumnUInt8& _value_column;
+    ColumnUInt8& _null_map_column;
+};
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 0cee4fe749b..edaa705640b 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -21,6 +21,7 @@
 #include "process_hash_table_probe.h"
 #include "runtime/thread_context.h" // IWYU pragma: keep
 #include "util/simd/bits.h"
+#include "vec/columns/column_filter_helper.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vhash_join_node.h"
 
@@ -330,6 +331,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     int current_offset = 0;
     bool all_match_one = true;
     size_t probe_size = 0;
+
     auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
             current_offset, probe_index, probe_size, all_match_one);
 
@@ -353,6 +355,11 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
 
     _probe_hash<need_null_map_for_probe, HashTableType>(keys, hash_table_ctx, 
null_map);
 
+    std::unique_ptr<ColumnFilterHelper> mark_column;
+    if (is_mark_join) {
+        mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 
1]);
+    }
+
     {
         SCOPED_TIMER(_search_hashtable_timer);
         using FindResult = 
decltype(key_getter.find_key(hash_table_ctx.hash_table, 0, *_arena));
@@ -399,9 +406,14 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                         (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ 
find_result.is_found();
                 if constexpr (is_mark_join) {
                     ++current_offset;
-                    assert_cast<ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
-                            .get_data()
-                            .template push_back(need_go_ahead);
+                    bool null_result =
+                            (*null_map)[probe_index] ||
+                            (!need_go_ahead && 
_join_context->_has_null_value_in_build_side);
+                    if (null_result) {
+                        mark_column->insert_null();
+                    } else {
+                        mark_column->insert_value(need_go_ahead);
+                    }
                 } else {
                     current_offset += need_go_ahead;
                 }
@@ -650,21 +662,21 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
             }
         }
 
+        /// FIXME: incorrect result of semi mark join with other 
conjuncts(null value missed).
         if (is_mark_join) {
-            auto& matched_map = assert_cast<ColumnVector<UInt8>&>(
-                                        
*(output_block->get_by_position(orig_columns - 1)
-                                                  .column->assume_mutable()))
-                                        .get_data();
+            auto mark_column =
+                    output_block->get_by_position(orig_columns - 
1).column->assume_mutable();
+            ColumnFilterHelper helper(*mark_column);
 
             // For mark join, we only filter rows which have duplicate join 
keys.
             // And then, we set matched_map to the join result to do the mark 
join's filtering.
             for (size_t i = 1; i < row_count; ++i) {
                 if (!_same_to_prev[i]) {
-                    matched_map.push_back(filter_map[i - 1]);
+                    helper.insert_value(filter_map[i - 1]);
                     filter_map[i - 1] = true;
                 }
             }
-            matched_map.push_back(filter_map[filter_map.size() - 1]);
+            helper.insert_value(filter_map[filter_map.size() - 1]);
             filter_map[filter_map.size() - 1] = true;
         }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index aa91846cc8b..5f769e4cafe 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -129,7 +129,8 @@ HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* 
join_node)
           _probe_key_sz(join_node->_probe_key_sz),
           _left_output_slot_flags(&join_node->_left_output_slot_flags),
           _right_output_slot_flags(&join_node->_right_output_slot_flags),
-          
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output) {}
+          
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output),
+          _has_null_value_in_build_side(join_node->_has_null_in_build_side) {}
 
 HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* 
local_state)
         : 
_have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct),
@@ -435,10 +436,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_
         return Status::OK();
     }
 
-    if (_short_circuit_for_null_in_probe_side && _is_mark_join) {
-        /// If `_short_circuit_for_null_in_probe_side` is true, this indicates 
no rows
-        /// match the join condition, and this is 'mark join', so we need to 
create a column as mark
-        /// with all rows set to 0.
+    /// `_has_null_in_build_side` means have null value in build side.
+    /// `_short_circuit_for_null_in_build_side` means short circuit if has 
null in build side(e.g. null aware left anti join).
+    if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && 
_is_mark_join) {
+        /// We need to create a column as mark with all rows set to NULL.
         auto block_rows = _probe_block.rows();
         if (block_rows == 0) {
             *eos = _probe_eos;
@@ -452,8 +453,10 @@ Status HashJoinNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_
                 temp_block.insert(_probe_block.get_by_position(i));
             }
         }
-        auto mark_column = ColumnUInt8::create(block_rows, 0);
-        temp_block.insert({std::move(mark_column), 
std::make_shared<DataTypeUInt8>(), ""});
+        auto mark_column = 
ColumnNullable::create(ColumnUInt8::create(block_rows, 0),
+                                                  
ColumnUInt8::create(block_rows, 1));
+        temp_block.insert(
+                {std::move(mark_column), 
make_nullable(std::make_shared<DataTypeUInt8>()), ""});
 
         {
             SCOPED_TIMER(_join_filter_timer);
@@ -810,7 +813,7 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* 
state) {
         Block block;
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from data.
-        while (!eos && !_short_circuit_for_null_in_probe_side &&
+        while (!eos && (!_short_circuit_for_null_in_build_side || 
!_has_null_in_build_side) &&
                (!_probe_open_finish || 
!_is_hash_join_early_start_probe_eos(state))) {
             block.clear_column_data();
             RETURN_IF_CANCELLED(state);
@@ -839,8 +842,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
     // make one block for each 4 gigabytes
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
-    if (_short_circuit_for_null_in_probe_side) {
-        // TODO: if _short_circuit_for_null_in_probe_side is true we should 
finish current pipeline task.
+    if (_has_null_in_build_side) {
+        // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
         return Status::OK();
     }
@@ -913,7 +916,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
             _shared_hash_table_context->blocks = _build_blocks;
             _shared_hash_table_context->hash_table_variants = 
_hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
-                    _short_circuit_for_null_in_probe_side;
+                    _has_null_in_build_side;
             if (_runtime_filter_slots) {
                 
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
             }
@@ -930,8 +933,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
         _build_phase_profile->add_info_string(
                 "SharedHashTableFrom",
                 
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
-        _short_circuit_for_null_in_probe_side =
-                
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
+        _has_null_in_build_side = 
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
         _hash_table_variants = std::static_pointer_cast<HashTableVariants>(
                 _shared_hash_table_context->hash_table_variants);
         _build_blocks = _shared_hash_table_context->blocks;
@@ -1117,7 +1119,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
                                         has_null_value || 
short_circuit_for_null_in_build_side
                                                 ? &null_map_val->get_data()
                                                 : nullptr,
-                                        
&_short_circuit_for_null_in_probe_side);
+                                        &_has_null_in_build_side);
                     }},
             *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
             make_bool_variant(_short_circuit_for_null_in_build_side));
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index c75ab58357c..c4a369e802d 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -235,6 +235,9 @@ struct ProcessHashTableBuild {
                 }
                 if constexpr (ignore_null) {
                     if ((*null_map)[k]) {
+                        if (has_null_key) {
+                            *has_null_key = true;
+                        }
                         continue;
                     }
                 }
@@ -525,6 +528,7 @@ struct HashJoinProbeContext {
 
     // for cases when a probe row matches more than batch size build rows.
     bool* _is_any_probe_match_row_output;
+    bool _has_null_value_in_build_side {};
 };
 
 class HashJoinNode final : public VJoinNodeBase {
@@ -576,8 +580,8 @@ private:
 
     void _init_short_circuit_for_probe() override {
         _short_circuit_for_probe =
-                (_short_circuit_for_null_in_probe_side &&
-                 _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_mark_join) ||
+                (_has_null_in_build_side && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+                 !_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN 
&& !_is_mark_join) ||
                 (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 2a3a06f4047..4cdb7aed78c 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -146,11 +146,9 @@ void VJoinNodeBase::_construct_mutable_join_block() {
             _join_block.insert({type_ptr->create_column(), type_ptr, 
slot_desc->col_name()});
         }
     }
-    if (_is_mark_join) {
-        _join_block.replace_by_position(
-                _join_block.columns() - 1,
-                
remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
-    }
+
+    DCHECK(!_is_mark_join ||
+           _join_block.get_by_position(_join_block.columns() - 
1).column->is_nullable());
 }
 
 Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* 
output_block,
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 0de7ae11064..9bb946bc0ef 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -117,13 +117,13 @@ protected:
 
     // For null aware left anti join, we apply a short circuit strategy.
     // 1. Set _short_circuit_for_null_in_build_side to true if join operator 
is null aware left anti join.
-    // 2. In build phase, we stop materialize build side when we meet the 
first null value and set _short_circuit_for_null_in_probe_side to true.
-    // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true, 
join node returns empty block directly. Otherwise, probing will continue as the 
same as generic left anti join.
+    // 2. In build phase, we stop materialize build side when we meet the 
first null value and set _has_null_in_build_side to true.
+    // 3. In probe phase, if _has_null_in_build_side is true, join node 
returns empty block directly. Otherwise, probing will continue as the same as 
generic left anti join.
     const bool _short_circuit_for_null_in_build_side = false;
-    bool _short_circuit_for_null_in_probe_side = false;
+    bool _has_null_in_build_side = false;
 
     // For some join case, we can apply a short circuit strategy
-    // 1. _short_circuit_for_null_in_probe_side = true
+    // 1. _has_null_in_build_side = true
     // 2. build side rows is empty, Join op is: inner join/right outer 
join/left semi/right semi/right anti
     bool _short_circuit_for_probe = false;
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 49aa6970ba8..7996333a40f 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -48,6 +48,7 @@
 #include "util/simd/bits.h"
 #include "util/telemetry/telemetry.h"
 #include "vec/columns/column_const.h"
+#include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
@@ -300,10 +301,9 @@ void 
VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
     for (size_t i = 0; i < _num_build_side_columns; ++i) {
         dst_columns[_num_probe_side_columns + 
i]->insert_many_defaults(_left_side_process_count);
     }
-    IColumn::Filter& mark_data = 
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
-                                         *dst_columns[dst_columns.size() - 1])
-                                         .get_data();
-    mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
+
+    auto& mark_column = *dst_columns[dst_columns.size() - 1];
+    ColumnFilterHelper(mark_column).resize_fill(mark_column.size() + 
_left_side_process_count, 0);
 }
 
 void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& 
mutable_block,
@@ -363,12 +363,9 @@ void VNestedLoopJoinNode::_update_additional_flags(Block* 
block) {
         }
     }
     if (_is_mark_join) {
-        IColumn::Filter& mark_data =
-                assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
-                        *block->get_by_position(block->columns() - 
1).column->assume_mutable())
-                        .get_data();
-        if (mark_data.size() < block->rows()) {
-            mark_data.resize_fill(block->rows(), 1);
+        auto mark_column = block->get_by_position(block->columns() - 
1).column->assume_mutable();
+        if (mark_column->size() < block->rows()) {
+            ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1);
         }
     }
 }
@@ -490,14 +487,12 @@ void 
VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
                 _resize_fill_tuple_is_null_column(new_size, 0, 1);
             }
         } else {
-            IColumn::Filter& mark_data = 
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
-                                                 
*dst_columns[dst_columns.size() - 1])
-                                                 .get_data();
-            mark_data.reserve(mark_data.size() + _left_side_process_count);
+            ColumnFilterHelper mark_column(*dst_columns[dst_columns.size() - 
1]);
+            mark_column.reserve(mark_column.size() + _left_side_process_count);
             DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_left_block.rows());
             for (int j = _left_block_start_pos;
                  j < _left_block_start_pos + _left_side_process_count; ++j) {
-                mark_data.emplace_back(IsSemi == 
_cur_probe_row_visited_flags[j]);
+                mark_column.insert_value(IsSemi == 
_cur_probe_row_visited_flags[j]);
             }
             for (size_t i = 0; i < _num_probe_side_columns; ++i) {
                 const ColumnWithTypeAndName src_column = 
_left_block.get_by_position(i);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java
index 099e64eb5d2..021fcea1a3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/MarkJoinSlotReference.java
@@ -29,17 +29,17 @@ public class MarkJoinSlotReference extends SlotReference 
implements SlotNotFromC
     final boolean existsHasAgg;
 
     public MarkJoinSlotReference(String name) {
-        super(name, BooleanType.INSTANCE, false);
+        super(name, BooleanType.INSTANCE, true);
         this.existsHasAgg = false;
     }
 
     public MarkJoinSlotReference(String name, boolean existsHasAgg) {
-        super(name, BooleanType.INSTANCE, false);
+        super(name, BooleanType.INSTANCE, true);
         this.existsHasAgg = existsHasAgg;
     }
 
     public MarkJoinSlotReference(ExprId exprId, String name, boolean 
existsHasAgg) {
-        super(exprId, name, BooleanType.INSTANCE, false, ImmutableList.of());
+        super(exprId, name, BooleanType.INSTANCE, true, ImmutableList.of());
         this.existsHasAgg = existsHasAgg;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index dc969418ce9..b9e61e256bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -53,7 +53,8 @@ import java.util.stream.Collectors;
 public class JoinUtils {
     public static boolean couldShuffle(Join join) {
         // Cross-join and Null-Aware-Left-Anti-Join only can be broadcast join.
-        return !(join.getJoinType().isCrossJoin()) && 
!(join.getJoinType().isNullAwareLeftAntiJoin());
+        // Because mark join would consider null value from both build and 
probe side, so must use broadcast join too.
+        return !(join.getJoinType().isCrossJoin() || 
join.getJoinType().isNullAwareLeftAntiJoin() || join.isMarkJoin());
     }
 
     public static boolean couldBroadcast(Join join) {
diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out 
b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
index 732a72a3907..647babc200d 100644
--- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
+++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
@@ -450,3 +450,18 @@
 22     3
 24     4
 
+-- !mark_join_nullable --
+\N
+\N
+\N
+\N
+\N
+\N
+true
+true
+true
+true
+\N
+\N
+\N
+
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out
index fcb2ade8dac..f732357c250 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query10.out
@@ -19,11 +19,11 @@ PhysicalResultSink
 ----------------------------PhysicalProject
 ------------------------------filter((date_dim.d_moy <= 4) and (date_dim.d_moy 
>= 1) and (date_dim.d_year = 2001))
 --------------------------------PhysicalOlapScan[date_dim]
---------------------PhysicalProject
-----------------------filter(($c$1 OR $c$2))
-------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
catalog_sales.cs_ship_customer_sk))otherCondition=()
---------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
web_sales.ws_bill_customer_sk))otherCondition=()
-----------------------------PhysicalDistribute
+--------------------PhysicalDistribute
+----------------------PhysicalProject
+------------------------filter(($c$1 OR $c$2))
+--------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
catalog_sales.cs_ship_customer_sk))otherCondition=()
+----------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
web_sales.ws_bill_customer_sk))otherCondition=()
 ------------------------------PhysicalProject
 --------------------------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = 
c.c_current_cdemo_sk))otherCondition=()
 ----------------------------------PhysicalOlapScan[customer_demographics]
@@ -36,22 +36,22 @@ PhysicalResultSink
 ------------------------------------------PhysicalProject
 --------------------------------------------filter(ca_county IN ('Cochran 
County', 'Kandiyohi County', 'Marquette County', 'Storey County', 'Warren 
County'))
 
----------------------------------------------PhysicalOlapScan[customer_address]
+------------------------------PhysicalDistribute
+--------------------------------PhysicalProject
+----------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
+------------------------------------PhysicalProject
+--------------------------------------PhysicalOlapScan[web_sales]
+------------------------------------PhysicalDistribute
+--------------------------------------PhysicalProject
+----------------------------------------filter((date_dim.d_moy <= 4) and 
(date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
+------------------------------------------PhysicalOlapScan[date_dim]
 ----------------------------PhysicalDistribute
 ------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
+--------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
 ----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[web_sales]
+------------------------------------PhysicalOlapScan[catalog_sales]
 ----------------------------------PhysicalDistribute
 ------------------------------------PhysicalProject
 --------------------------------------filter((date_dim.d_moy <= 4) and 
(date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
 ----------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute
-----------------------------PhysicalProject
-------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
---------------------------------PhysicalProject
-----------------------------------PhysicalOlapScan[catalog_sales]
---------------------------------PhysicalDistribute
-----------------------------------PhysicalProject
-------------------------------------filter((date_dim.d_moy <= 4) and 
(date_dim.d_moy >= 1) and (date_dim.d_year = 2001))
---------------------------------------PhysicalOlapScan[date_dim]
 
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out
index 47b9f1061ab..4bf6857ee71 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query35.out
@@ -19,11 +19,11 @@ PhysicalResultSink
 ----------------------------PhysicalProject
 ------------------------------filter((date_dim.d_qoy < 4) and (date_dim.d_year 
= 2001))
 --------------------------------PhysicalOlapScan[date_dim]
---------------------PhysicalProject
-----------------------filter(($c$1 OR $c$2))
-------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
catalog_sales.cs_ship_customer_sk))otherCondition=()
---------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
web_sales.ws_bill_customer_sk))otherCondition=()
-----------------------------PhysicalDistribute
+--------------------PhysicalDistribute
+----------------------PhysicalProject
+------------------------filter(($c$1 OR $c$2))
+--------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
catalog_sales.cs_ship_customer_sk))otherCondition=()
+----------------------------hashJoin[LEFT_SEMI_JOIN] 
hashCondition=((c.c_customer_sk = 
web_sales.ws_bill_customer_sk))otherCondition=()
 ------------------------------PhysicalProject
 --------------------------------hashJoin[INNER_JOIN] 
hashCondition=((customer_demographics.cd_demo_sk = 
c.c_current_cdemo_sk))otherCondition=()
 ----------------------------------PhysicalDistribute
@@ -38,22 +38,22 @@ PhysicalResultSink
 ----------------------------------PhysicalDistribute
 ------------------------------------PhysicalProject
 --------------------------------------PhysicalOlapScan[customer_demographics]
+------------------------------PhysicalDistribute
+--------------------------------PhysicalProject
+----------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
+------------------------------------PhysicalProject
+--------------------------------------PhysicalOlapScan[web_sales]
+------------------------------------PhysicalDistribute
+--------------------------------------PhysicalProject
+----------------------------------------filter((date_dim.d_qoy < 4) and 
(date_dim.d_year = 2001))
+------------------------------------------PhysicalOlapScan[date_dim]
 ----------------------------PhysicalDistribute
 ------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_sales.ws_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
+--------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
 ----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[web_sales]
+------------------------------------PhysicalOlapScan[catalog_sales]
 ----------------------------------PhysicalDistribute
 ------------------------------------PhysicalProject
 --------------------------------------filter((date_dim.d_qoy < 4) and 
(date_dim.d_year = 2001))
 ----------------------------------------PhysicalOlapScan[date_dim]
---------------------------PhysicalDistribute
-----------------------------PhysicalProject
-------------------------------hashJoin[INNER_JOIN] 
hashCondition=((catalog_sales.cs_sold_date_sk = 
date_dim.d_date_sk))otherCondition=()
---------------------------------PhysicalProject
-----------------------------------PhysicalOlapScan[catalog_sales]
---------------------------------PhysicalDistribute
-----------------------------------PhysicalProject
-------------------------------------filter((date_dim.d_qoy < 4) and 
(date_dim.d_year = 2001))
---------------------------------------PhysicalOlapScan[date_dim]
 
diff --git 
a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy 
b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
index 482eab7a6aa..6664ad0c6c7 100644
--- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
+++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
@@ -50,6 +50,14 @@ suite ("sub_query_correlated") {
         DROP TABLE IF EXISTS `sub_query_correlated_subquery7`
     """
 
+    sql """
+        DROP TABLE IF EXISTS `sub_query_correlated_subquery8`
+    """
+
+    sql """
+        DROP TABLE IF EXISTS `sub_query_correlated_subquery9`
+    """
+
     sql """
         create table if not exists sub_query_correlated_subquery1
         (k1 bigint, k2 bigint)
@@ -105,6 +113,21 @@ suite ("sub_query_correlated") {
             properties('replication_num' = '1');
     """
 
+    sql """
+        create table if not exists sub_query_correlated_subquery8
+        (k1 bigint, k2 bigint)
+        duplicate key(k1)
+        distributed by hash(k2) buckets 1
+        properties('replication_num' = '1')
+    """
+
+    sql """
+        create table if not exists sub_query_correlated_subquery9
+            (k1 int, k2 varchar(128), k3 bigint, v1 bigint, v2 bigint)
+            distributed by hash(k2) buckets 1
+            properties('replication_num' = '1');
+    """
+
     sql """
         insert into sub_query_correlated_subquery1 values (1,2), (1,3), (2,4), 
(2,5), (3,3), (3,4), (20,2), (22,3), (24,4)
     """
@@ -126,7 +149,7 @@ suite ("sub_query_correlated") {
         insert into sub_query_correlated_subquery5 values (5,4), (5,2), (8,3), 
(5,4), (6,7), (8,9)
     """
 
-     sql """
+    sql """
         insert into sub_query_correlated_subquery6 values 
(1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), 
(22,3), (24,4),(null,null);
     """
 
@@ -135,6 +158,15 @@ suite ("sub_query_correlated") {
             (2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), 
(3,"abc",4,5,3), (null,null,null,null,null);
     """
 
+    sql """
+        insert into sub_query_correlated_subquery8 values 
(1,null),(null,1),(1,2), (null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), 
(22,3), (24,4),(null,null);
+    """
+
+    sql """
+        insert into sub_query_correlated_subquery9 values (1,"abc",2,3,4), 
(1,"abcd",3,3,4),
+            (2,"xyz",2,4,2),(2,"uvw",3,4,2), (2,"uvw",3,4,2), (3,"abc",4,5,3), 
(3,"abc",4,5,3), (null,null,null,null,null);
+    """
+
     sql "SET enable_fallback_to_original_planner=false"
 
     //------------------Correlated-----------------
@@ -496,6 +528,10 @@ suite ("sub_query_correlated") {
         order by k1, k2;
     """
 
+    qt_mark_join_nullable """
+        select sub_query_correlated_subquery8.k1 in (select 
sub_query_correlated_subquery9.k3 from sub_query_correlated_subquery9) from 
sub_query_correlated_subquery8 order by k1, k2;
+    """
+
     // order_qt_doris_6937_2 """
     //     select * from sub_query_correlated_subquery1 where 
sub_query_correlated_subquery1.k1 not in (select 
sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where 
sub_query_correlated_subquery3.v2 > sub_query_correlated_subquery1.k2) or k1 < 
10 order by k1, k2;
     // """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to