This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d00d029ffb8 Separate fixed key hash map context creator (#25438)
d00d029ffb8 is described below

commit d00d029ffb84e1a62f00c6bb156c15af85e3dcf7
Author: Pxl <[email protected]>
AuthorDate: Mon Oct 16 11:20:30 2023 +0800

    Separate fixed key hash map context creator (#25438)
    
    Separate fixed key hash map context creator
---
 .clang-tidy                                        |   1 +
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  63 +---
 be/src/pipeline/exec/aggregation_sink_operator.h   |   5 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |   6 +-
 ...istinct_streaming_aggregation_sink_operator.cpp |   5 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  75 +----
 be/src/pipeline/exec/hashjoin_build_sink.h         |   3 -
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   1 -
 .../pipeline/exec/partition_sort_sink_operator.cpp |  59 +---
 .../pipeline/exec/partition_sort_sink_operator.h   |   1 -
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  13 +-
 be/src/pipeline/pipeline_x/dependency.h            |  59 +---
 be/src/vec/common/columns_hashing.h                |  21 +-
 be/src/vec/common/hash_table/fixed_hash_map.h      | 165 ---------
 be/src/vec/common/hash_table/fixed_hash_table.h    | 372 ---------------------
 be/src/vec/common/hash_table/hash_map_context.h    | 108 +++---
 .../common/hash_table/hash_map_context_creator.h   |  87 +++++
 be/src/vec/common/hash_table/hash_map_util.h       |  22 --
 be/src/vec/common/hash_table/hash_table.h          |   4 +-
 .../vec/common/hash_table/hash_table_set_build.h   |   9 +-
 .../vec/common/hash_table/hash_table_set_probe.h   |  10 +-
 .../vec/common/hash_table/partitioned_hash_map.h   |   3 +
 be/src/vec/common/hash_table/string_hash_table.h   |   4 -
 be/src/vec/common/uint128.h                        |   2 +-
 be/src/vec/exec/distinct_vaggregation_node.cpp     |   4 +-
 be/src/vec/exec/join/process_hash_table_probe.h    |   1 -
 .../vec/exec/join/process_hash_table_probe_impl.h  |  12 +-
 be/src/vec/exec/join/vhash_join_node.cpp           |  57 +---
 be/src/vec/exec/join/vhash_join_node.h             |  22 +-
 be/src/vec/exec/vaggregation_node.cpp              |  59 +---
 be/src/vec/exec/vaggregation_node.h                |  32 +-
 be/src/vec/exec/vpartition_sort_node.cpp           |  56 +---
 be/src/vec/exec/vpartition_sort_node.h             |  22 +-
 be/src/vec/exec/vset_operation_node.cpp            |  67 +---
 be/src/vec/exec/vset_operation_node.h              |   2 -
 .../array/function_array_enumerate_uniq.cpp        |   4 +-
 36 files changed, 259 insertions(+), 1177 deletions(-)

diff --git a/.clang-tidy b/.clang-tidy
index aad12455701..f9d3155e36b 100644
--- a/.clang-tidy
+++ b/.clang-tidy
@@ -13,6 +13,7 @@ Checks: |
   readability-*,
   -readability-identifier-length,
   -readability-implicit-bool-conversion,
+  -readability-function-cognitive-complexity,
   portability-simd-intrinsics,
   performance-type-promotion-in-math-fn,
   performance-faster-string-find,
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 419313dce79..013c7854f86 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -23,6 +23,7 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/streaming_aggregation_sink_operator.h"
 #include "runtime/primitive_type.h"
+#include "vec/common/hash_table/hash.h"
 
 namespace doris::pipeline {
 
@@ -109,8 +110,6 @@ Status AggSinkLocalState<DependencyType, 
Derived>::init(RuntimeState* state,
     Base::_shared_state->agg_profile_arena = 
std::make_unique<vectorized::Arena>();
 
     if (Base::_shared_state->probe_expr_ctxs.empty()) {
-        _agg_data->init(vectorized::AggregatedDataVariants::Type::without_key);
-
         _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
                 
Base::_shared_state->agg_profile_arena->alloc(p._total_size_of_aggregate_states));
 
@@ -500,9 +499,8 @@ void AggSinkLocalState<DependencyType, 
Derived>::_emplace_into_hash_table(
                 SCOPED_TIMER(_hash_table_compute_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, Base::_shared_state->probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, 
Base::_shared_state->probe_key_sz,
-                                                num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 auto creator = [this](const auto& ctor, auto& key, auto& 
origin) {
                     HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
@@ -545,9 +543,8 @@ void AggSinkLocalState<DependencyType, 
Derived>::_find_in_hash_table(
             [&](auto&& agg_method) -> void {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, Base::_shared_state->probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, 
Base::_shared_state->probe_key_sz,
-                                                num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 /// For all rows.
                 for (size_t i = 0; i < num_rows; ++i) {
@@ -625,52 +622,10 @@ void AggSinkLocalState<DependencyType, 
Derived>::_init_hash_method(
                         !Base::_parent->template cast<typename 
Derived::Parent>()._is_first_phase),
                 is_nullable);
     } else {
-        bool use_fixed_key = true;
-        bool has_null = false;
-        size_t key_byte_size = 0;
-        size_t bitmap_size =
-                
vectorized::get_bitmap_size(Base::_shared_state->probe_expr_ctxs.size());
-
-        
Base::_shared_state->probe_key_sz.resize(Base::_shared_state->probe_expr_ctxs.size());
-        for (int i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); ++i) {
-            const auto& expr = Base::_shared_state->probe_expr_ctxs[i]->root();
-            const auto& data_type = expr->data_type();
-
-            if (!data_type->have_maximum_size_of_value()) {
-                use_fixed_key = false;
-                break;
-            }
-
-            auto is_null = data_type->is_nullable();
-            has_null |= is_null;
-            Base::_shared_state->probe_key_sz[i] =
-                    data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
-            key_byte_size += Base::_shared_state->probe_key_sz[i];
-        }
-
-        if (!has_null) {
-            bitmap_size = 0;
-        }
-
-        if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
-            use_fixed_key = false;
-        }
-
-        if (use_fixed_key) {
-            if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
-                t = Type::int64_keys;
-            } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                t = Type::int128_keys;
-            } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt136)) {
-                t = Type::int136_keys;
-            } else {
-                t = Type::int256_keys;
-            }
-            _agg_data->init(get_hash_key_type_with_phase(
-                                    t, !Base::_parent->template cast<typename 
Derived::Parent>()
-                                                ._is_first_phase),
-                            has_null);
-        } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            vectorized::AggregateDataPtr,
+                                            
vectorized::AggregatedMethodVariants>(
+                    _agg_data->method_variant, probe_exprs)) {
             _agg_data->init(Type::serialized);
         }
     }
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index de73b128742..e1223f5c7eb 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -127,10 +127,7 @@ protected:
             }
         }
 
-        {
-            context.insert_keys_into_columns(keys, key_columns, num_rows,
-                                             
Base::_shared_state->probe_key_sz);
-        }
+        { context.insert_keys_into_columns(keys, key_columns, num_rows); }
 
         if (hash_table.has_null_key_data()) {
             // only one key of group by support wrap null key
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index fd192e04ab3..8d959b525e4 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -201,8 +201,7 @@ Status 
AggLocalState::_serialize_with_serialized_key_result_non_spill(RuntimeSta
 
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
-                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows,
-                                                        
_shared_state->probe_key_sz);
+                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
                 }
 
                 if (iter == _shared_state->aggregate_data_container->end()) {
@@ -358,8 +357,7 @@ Status 
AggLocalState::_get_result_with_serialized_key_non_spill(RuntimeState* st
 
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
-                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows,
-                                                        
_shared_state->probe_key_sz);
+                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
                 }
 
                 for (size_t i = 0; i < 
_shared_state->aggregate_evaluators.size(); ++i) {
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
index 4651da4ecd0..f0a86fcbd6c 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -156,9 +156,8 @@ void 
DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct(
                 SCOPED_TIMER(_hash_table_compute_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _shared_state->probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, 
Base::_shared_state->probe_key_sz,
-                                                num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
                 size_t row = 0;
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
                     HashMethodType::try_presis_key(key, origin, _arena);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 939363776b0..153882075b6 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -51,7 +51,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
     _shared_hash_table_dependency = 
SharedHashTableDependency::create_shared(_parent->id());
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
-    _shared_state->probe_key_sz = p._build_key_sz;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
         _shared_state->build_blocks = p._shared_hash_table_context->blocks;
     } else {
@@ -144,10 +143,6 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
-vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() {
-    return _parent->cast<HashJoinBuildSinkOperatorX>()._build_key_sz;
-}
-
 bool HashJoinBuildSinkLocalState::build_unique() const {
     return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
 }
@@ -326,62 +321,8 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                     }
                     return;
                 }
-
-                bool use_fixed_key = true;
-                bool has_null = false;
-                size_t key_byte_size = 0;
-                size_t bitmap_size = 
vectorized::get_bitmap_size(_build_expr_ctxs.size());
-
-                for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
-                    const auto vexpr = _build_expr_ctxs[i]->root();
-                    const auto& data_type = vexpr->data_type();
-
-                    if (!data_type->have_maximum_size_of_value()) {
-                        use_fixed_key = false;
-                        break;
-                    }
-
-                    auto is_null = data_type->is_nullable();
-                    has_null |= is_null;
-                    key_byte_size += p._build_key_sz[i];
-                }
-
-                if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) 
{
-                    use_fixed_key = false;
-                }
-
-                if (use_fixed_key) {
-                    // TODO: may we should support uint256 in the future
-                    if (has_null) {
-                        if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt64)) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I64FixedKeyHashTableContext<
-                                            true, RowRefListType>>();
-                        } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I128FixedKeyHashTableContext<
-                                            true, RowRefListType>>();
-                        } else {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I256FixedKeyHashTableContext<
-                                            true, RowRefListType>>();
-                        }
-                    } else {
-                        if (key_byte_size <= sizeof(vectorized::UInt64)) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I64FixedKeyHashTableContext<
-                                            false, RowRefListType>>();
-                        } else if (key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I128FixedKeyHashTableContext<
-                                            false, RowRefListType>>();
-                        } else {
-                            _shared_state->hash_table_variants
-                                    
->emplace<vectorized::I256FixedKeyHashTableContext<
-                                            false, RowRefListType>>();
-                        }
-                    }
-                } else {
+                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                            *_shared_state->hash_table_variants, 
_build_expr_ctxs)) {
                     _shared_state->hash_table_variants
                             
->emplace<vectorized::SerializedHashTableContext<RowRefListType>>();
                 }
@@ -448,18 +389,6 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
                 null_aware ||
                 (_build_expr_ctxs.back()->root()->is_nullable() && 
build_stores_null));
     }
-
-    for (const auto& expr : _build_expr_ctxs) {
-        const auto& data_type = expr->root()->data_type();
-        if (!data_type->have_maximum_size_of_value()) {
-            break;
-        }
-
-        auto is_null = data_type->is_nullable();
-        
_build_key_sz.push_back(data_type->get_maximum_size_of_value_in_memory() -
-                                (is_null ? 1 : 0));
-    }
-
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 459b66718c2..9b43f95cd3b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,7 +70,6 @@ public:
     void init_short_circuit_for_probe();
     HashJoinBuildSinkOperatorX* join_build() { return 
(HashJoinBuildSinkOperatorX*)_parent; }
 
-    vectorized::Sizes& build_key_sz();
     bool build_unique() const;
     std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
     std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
@@ -168,8 +167,6 @@ private:
     // mark the join column whether support null eq
     std::vector<bool> _is_null_safe_eq_join;
 
-    vectorized::Sizes _build_key_sz;
-
     bool _is_broadcast_join = false;
     std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hashtable_controller = nullptr;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index ed241141e5d..082f45199c1 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -92,7 +92,6 @@ public:
     std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
         return _shared_state->build_blocks;
     }
-    vectorized::Sizes probe_key_sz() const { return 
_shared_state->probe_key_sz; }
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 4bd66be9dbf..515beba944e 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -18,6 +18,7 @@
 #include "partition_sort_sink_operator.h"
 
 #include "common/status.h"
+#include "vec/common/hash_table/hash.h"
 
 namespace doris {
 
@@ -180,10 +181,9 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
 
-                AggState state(key_columns, local_state._partition_key_sz);
+                AggState state(key_columns);
                 size_t num_rows = input_block->rows();
-                agg_method.init_serialized_keys(key_columns, 
local_state._partition_key_sz,
-                                                num_rows);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
                     HashMethodType::try_presis_key(key, origin, 
*local_state._agg_arena_pool);
@@ -282,56 +282,9 @@ void PartitionSortSinkLocalState::_init_hash_method() {
             
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
         }
     } else {
-        bool use_fixed_key = true;
-        bool has_null = false;
-        size_t key_byte_size = 0;
-        size_t bitmap_size = vectorized::get_bitmap_size(_partition_exprs_num);
-
-        _partition_key_sz.resize(_partition_exprs_num);
-        for (int i = 0; i < _partition_exprs_num; ++i) {
-            const auto& data_type = 
_partition_expr_ctxs[i]->root()->data_type();
-
-            if (!data_type->have_maximum_size_of_value()) {
-                use_fixed_key = false;
-                break;
-            }
-
-            auto is_null = data_type->is_nullable();
-            has_null |= is_null;
-            _partition_key_sz[i] =
-                    data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
-            key_byte_size += _partition_key_sz[i];
-        }
-
-        if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
-            use_fixed_key = false;
-        }
-
-        if (use_fixed_key) {
-            if (has_null) {
-                if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) 
{
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
-                } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
-                } else {
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
-                }
-            } else {
-                if (key_byte_size <= sizeof(vectorized::UInt64)) {
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
-                } else if (key_byte_size <= sizeof(vectorized::UInt128)) {
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
-                } else {
-                    _partitioned_data->init(
-                            
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
-                }
-            }
-        } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
+                                            vectorized::PartitionDataPtr>(
+                    _partitioned_data->method_variant, _partition_expr_ctxs)) {
             
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
         }
     }
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 59517642bf4..1b76623c4fa 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -72,7 +72,6 @@ private:
     std::vector<const vectorized::IColumn*> _partition_columns;
     std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool;
-    std::vector<size_t> _partition_key_sz;
     int _partition_exprs_num = 0;
 
     RuntimeProfile::Counter* _build_timer;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 055709da3a2..81c30d45d1e 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -246,11 +246,12 @@ void 
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     if constexpr (std::is_same_v<typename 
HashTableCtxType::Mapped,
                                                  
vectorized::RowRefListWithFlags>) {
-                        HashTableCtxType tmp_hash_table;
+                        auto tmp_hash_table =
+                                std::make_shared<typename 
HashTableCtxType::HashMapType>();
                         bool is_need_shrink =
                                 
arg.hash_table->should_be_shrink(valid_element_in_hash_tbl);
                         if (is_intersect || is_need_shrink) {
-                            tmp_hash_table.hash_table->init_buf_size(
+                            tmp_hash_table->init_buf_size(
                                     valid_element_in_hash_tbl / 
arg.hash_table->get_factor() + 1);
                         }
 
@@ -266,15 +267,13 @@ void 
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
                                         if constexpr (is_intersect) { 
//intersected
                                             if (it->visited) {
                                                 it->visited = false;
-                                                
tmp_hash_table.hash_table->insert(
-                                                        iter->get_value());
+                                                
tmp_hash_table->insert(iter->get_value());
                                             }
                                             ++iter;
                                         } else { //except
                                             if constexpr 
(is_need_shrink_const) {
                                                 if (!it->visited) {
-                                                    
tmp_hash_table.hash_table->insert(
-                                                            iter->get_value());
+                                                    
tmp_hash_table->insert(iter->get_value());
                                                 }
                                             }
                                             ++iter;
@@ -285,7 +284,7 @@ void 
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
 
                         arg.reset();
                         if (is_intersect || is_need_shrink) {
-                            arg.hash_table = 
std::move(tmp_hash_table.hash_table);
+                            arg.hash_table = std::move(tmp_hash_table);
                         }
                     } else {
                         LOG(FATAL) << "FATAL: Invalid RowRefList";
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index c1ed4074772..5d643f58fbd 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -23,6 +23,7 @@
 
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/exec/join/process_hash_table_probe.h"
@@ -313,7 +314,6 @@ public:
     std::unique_ptr<vectorized::SpillPartitionHelper> spill_partition_helper;
     // group by k1,k2
     vectorized::VExprContextSPtrs probe_expr_ctxs;
-    std::vector<size_t> probe_key_sz;
     size_t input_num_rows = 0;
     std::vector<vectorized::AggregateDataPtr> values;
     std::unique_ptr<vectorized::Arena> agg_profile_arena;
@@ -565,7 +565,6 @@ struct HashJoinSharedState : public JoinSharedState {
     // maybe share hash table with other fragment instances
     std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
             std::make_shared<vectorized::HashTableVariants>();
-    vectorized::Sizes probe_key_sz;
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
     std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
@@ -674,8 +673,6 @@ struct SetSharedState {
     /// init in setup_local_states
     std::unique_ptr<vectorized::HashTableVariants> hash_table_variants; // the 
real data HERE.
     std::vector<bool> build_not_ignore_null;
-    std::vector<size_t> probe_key_sz;
-    std::vector<size_t> build_key_sz;
 
     /// init in both upstream side.
     //The i-th result expr list refers to the i-th child.
@@ -735,57 +732,9 @@ public:
             return;
         }
 
-        bool use_fixed_key = true;
-        bool has_null = false;
-        size_t key_byte_size = 0;
-        size_t bitmap_size = 
vectorized::get_bitmap_size(child_exprs_lists[0].size());
-
-        build_key_sz.resize(child_exprs_lists[0].size());
-        probe_key_sz.resize(child_exprs_lists[0].size());
-        for (int i = 0; i < child_exprs_lists[0].size(); ++i) {
-            const auto vexpr = child_exprs_lists[0][i]->root();
-            const auto& data_type = vexpr->data_type();
-
-            if (!data_type->have_maximum_size_of_value()) {
-                use_fixed_key = false;
-                break;
-            }
-
-            auto is_null = data_type->is_nullable();
-            has_null |= is_null;
-            build_key_sz[i] = data_type->get_maximum_size_of_value_in_memory() 
- (is_null ? 1 : 0);
-            probe_key_sz[i] = build_key_sz[i];
-            key_byte_size += probe_key_sz[i];
-        }
-
-        if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
-            use_fixed_key = false;
-        }
-        if (use_fixed_key) {
-            if (has_null) {
-                if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) 
{
-                    
hash_table_variants->emplace<vectorized::I64FixedKeyHashTableContext<
-                            true, vectorized::RowRefListWithFlags>>();
-                } else if (bitmap_size + key_byte_size <= 
sizeof(vectorized::UInt128)) {
-                    
hash_table_variants->emplace<vectorized::I128FixedKeyHashTableContext<
-                            true, vectorized::RowRefListWithFlags>>();
-                } else {
-                    
hash_table_variants->emplace<vectorized::I256FixedKeyHashTableContext<
-                            true, vectorized::RowRefListWithFlags>>();
-                }
-            } else {
-                if (key_byte_size <= sizeof(vectorized::UInt64)) {
-                    
hash_table_variants->emplace<vectorized::I64FixedKeyHashTableContext<
-                            false, vectorized::RowRefListWithFlags>>();
-                } else if (key_byte_size <= sizeof(vectorized::UInt128)) {
-                    
hash_table_variants->emplace<vectorized::I128FixedKeyHashTableContext<
-                            false, vectorized::RowRefListWithFlags>>();
-                } else {
-                    
hash_table_variants->emplace<vectorized::I256FixedKeyHashTableContext<
-                            false, vectorized::RowRefListWithFlags>>();
-                }
-            }
-        } else {
+        if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32,
+                                            vectorized::RowRefListWithFlags>(
+                    *hash_table_variants, child_exprs_lists[0])) {
             hash_table_variants->emplace<
                     
vectorized::SerializedHashTableContext<vectorized::RowRefListWithFlags>>();
         }
diff --git a/be/src/vec/common/columns_hashing.h 
b/be/src/vec/common/columns_hashing.h
index 824ac2a37cc..83f01fdf4b2 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -47,8 +47,7 @@ struct HashMethodOneNumber
     using Self = HashMethodOneNumber<Value, Mapped, FieldType>;
     using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
 
-    /// If the keys of a fixed length then key_sizes contains their lengths, 
empty otherwise.
-    HashMethodOneNumber(const ColumnRawPtrs& key_columns, const Sizes& 
/*key_sizes*/) {}
+    HashMethodOneNumber(const ColumnRawPtrs& key_columns) {}
 
     using Base::find_key_with_hash;
 };
@@ -61,7 +60,7 @@ struct HashMethodString
     using Self = HashMethodString<Value, Mapped, place_string_to_arena>;
     using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
 
-    HashMethodString(const ColumnRawPtrs& key_columns, const Sizes& 
/*key_sizes*/) {}
+    HashMethodString(const ColumnRawPtrs& key_columns) {}
 
 protected:
     friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
@@ -79,7 +78,7 @@ struct HashMethodSerialized
     using Self = HashMethodSerialized<Value, Mapped>;
     using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
 
-    HashMethodSerialized(const ColumnRawPtrs& key_columns_, const Sizes& 
/*key_sizes*/) {}
+    HashMethodSerialized(const ColumnRawPtrs& key_columns) {}
 
 protected:
     friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, 
false>;
@@ -96,8 +95,7 @@ struct HashMethodKeysFixed
     using BaseHashed = columns_hashing_impl::HashMethodBase<Self, Value, 
Mapped, false>;
     using Base = columns_hashing_impl::BaseStateKeysFixed<Key, 
has_nullable_keys>;
 
-    HashMethodKeysFixed(const ColumnRawPtrs& key_columns, const Sizes& 
key_sizes_)
-            : Base(key_columns) {}
+    HashMethodKeysFixed(const ColumnRawPtrs& key_columns) : Base(key_columns) 
{}
 };
 
 template <typename SingleColumnMethod, typename Mapped>
@@ -109,16 +107,15 @@ struct HashMethodSingleLowNullableColumn : public 
SingleColumnMethod {
 
     const ColumnNullable* key_column;
 
-    static const ColumnRawPtrs get_nested_column(const IColumn* col) {
-        auto* nullable = check_and_get_column<ColumnNullable>(*col);
+    static ColumnRawPtrs get_nested_column(const IColumn* col) {
+        const auto* nullable = check_and_get_column<ColumnNullable>(*col);
         DCHECK(nullable != nullptr);
-        const auto nested_col = nullable->get_nested_column_ptr().get();
+        const auto* const nested_col = nullable->get_nested_column_ptr().get();
         return {nested_col};
     }
 
-    HashMethodSingleLowNullableColumn(const ColumnRawPtrs& 
key_columns_nullable,
-                                      const Sizes& key_sizes)
-            : Base(get_nested_column(key_columns_nullable[0]), key_sizes),
+    HashMethodSingleLowNullableColumn(const ColumnRawPtrs& 
key_columns_nullable)
+            : Base(get_nested_column(key_columns_nullable[0])),
               key_column(assert_cast<const 
ColumnNullable*>(key_columns_nullable[0])) {}
 
     template <typename Data, typename Func, typename CreatorForNull, typename 
KeyHolder>
diff --git a/be/src/vec/common/hash_table/fixed_hash_map.h 
b/be/src/vec/common/hash_table/fixed_hash_map.h
deleted file mode 100644
index a43a8381a51..00000000000
--- a/be/src/vec/common/hash_table/fixed_hash_map.h
+++ /dev/null
@@ -1,165 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/FixedHashMap.h
-// and modified by Doris
-
-#pragma once
-
-#include "vec/common/hash_table/fixed_hash_table.h"
-#include "vec/common/hash_table/hash_map.h"
-
-template <typename Key, typename TMapped, typename TState = HashTableNoState>
-struct FixedHashMapCell {
-    using Mapped = TMapped;
-    using State = TState;
-
-    using value_type = PairNoInit<Key, Mapped>;
-    using mapped_type = TMapped;
-
-    bool full;
-    Mapped mapped;
-
-    FixedHashMapCell() {}
-    FixedHashMapCell(const Key&, const State&) : full(true) {}
-    FixedHashMapCell(const value_type& value_, const State&) : full(true), 
mapped(value_.second) {}
-
-    const VoidKey get_key() const { return {}; }
-    Mapped& get_mapped() { return mapped; }
-    const Mapped& get_mapped() const { return mapped; }
-
-    bool is_zero(const State&) const { return !full; }
-    void set_zero() { full = false; }
-
-    /// Similar to FixedHashSetCell except that we need to contain a pointer 
to the Mapped field.
-    ///  Note that we have to assemble a continuous layout for the value_type 
on each call of getValue().
-    struct CellExt {
-        CellExt() {}
-        CellExt(Key&& key_, const FixedHashMapCell* ptr_)
-                : key(key_), ptr(const_cast<FixedHashMapCell*>(ptr_)) {}
-        void update(Key&& key_, const FixedHashMapCell* ptr_) {
-            key = key_;
-            ptr = const_cast<FixedHashMapCell*>(ptr_);
-        }
-        Key key;
-        FixedHashMapCell* ptr;
-
-        const Key& get_key() const { return key; }
-        Mapped& get_mapped() { return ptr->mapped; }
-        const Mapped& get_mapped() const { return ptr->mapped; }
-        const value_type get_value() const { return {key, ptr->mapped}; }
-    };
-};
-
-/// In case when we can encode empty cells with zero mapped values.
-template <typename Key, typename TMapped, typename TState = HashTableNoState>
-struct FixedHashMapImplicitZeroCell {
-    using Mapped = TMapped;
-    using State = TState;
-
-    using value_type = PairNoInit<Key, Mapped>;
-    using mapped_type = TMapped;
-
-    Mapped mapped;
-
-    FixedHashMapImplicitZeroCell() {}
-    FixedHashMapImplicitZeroCell(const Key&, const State&) {}
-    FixedHashMapImplicitZeroCell(const Key&, const Mapped& mapped_) : 
mapped(mapped_) {}
-    FixedHashMapImplicitZeroCell(const value_type& value_, const State&) : 
mapped(value_.second) {}
-
-    const VoidKey get_first() const { return {}; }
-    Mapped& get_second() { return mapped; }
-    const Mapped& get_second() const { return mapped; }
-
-    bool is_zero(const State&) const { return !mapped; }
-    void set_zero() { mapped = {}; }
-
-    /// Similar to FixedHashSetCell except that we need to contain a pointer 
to the Mapped field.
-    ///  Note that we have to assemble a continuous layout for the value_type 
on each call of getValue().
-    struct CellExt {
-        CellExt() {}
-        CellExt(Key&& key_, const FixedHashMapImplicitZeroCell* ptr_)
-                : key(key_), 
ptr(const_cast<FixedHashMapImplicitZeroCell*>(ptr_)) {}
-        void update(Key&& key_, const FixedHashMapImplicitZeroCell* ptr_) {
-            key = key_;
-            ptr = const_cast<FixedHashMapImplicitZeroCell*>(ptr_);
-        }
-        Key key;
-        FixedHashMapImplicitZeroCell* ptr;
-
-        const Key& get_first() const { return key; }
-        Mapped& get_second() { return ptr->mapped; }
-        const Mapped& get_second() const { return ptr->mapped; }
-        const value_type get_value() const { return {key, ptr->mapped}; }
-    };
-};
-
-template <typename Key, typename Mapped, typename State>
-ALWAYS_INLINE inline auto lookup_result_get_mapped(
-        FixedHashMapImplicitZeroCell<Key, Mapped, State>* cell) {
-    return &cell->get_second();
-}
-
-template <typename Key, typename Mapped, typename Cell = FixedHashMapCell<Key, 
Mapped>,
-          typename Size = FixedHashTableStoredSize<Cell>, typename Allocator = 
HashTableAllocator>
-class FixedHashMap : public FixedHashTable<Key, Cell, Size, Allocator> {
-public:
-    using Base = FixedHashTable<Key, Cell, Size, Allocator>;
-    using Self = FixedHashMap;
-    using LookupResult = typename Base::LookupResult;
-
-    using Base::Base;
-
-    template <typename Func>
-    void for_each_value(Func&& func) {
-        for (auto& v : *this) func(v.get_key(), v.get_mapped());
-    }
-
-    template <typename Func>
-    void for_each_mapped(Func&& func) {
-        for (auto& v : *this) func(v.get_second());
-    }
-
-    Mapped& ALWAYS_INLINE operator[](const Key& x) {
-        LookupResult it;
-        bool inserted;
-        this->emplace(x, it, inserted);
-        if (inserted) new (&it->get_mapped()) Mapped();
-
-        return it->get_mapped();
-    }
-
-    // fixed hash map never overflow
-    bool add_elem_size_overflow(size_t add_size) const { return false; }
-    template <typename MappedType>
-    char* get_null_key_data() {
-        return nullptr;
-    }
-    bool has_null_key_data() const { return false; }
-};
-
-template <typename Key, typename Mapped, typename Allocator = 
HashTableAllocator>
-using FixedImplicitZeroHashMap =
-        FixedHashMap<Key, Mapped, FixedHashMapImplicitZeroCell<Key, Mapped>,
-                     
FixedHashTableStoredSize<FixedHashMapImplicitZeroCell<Key, Mapped>>,
-                     Allocator>;
-
-template <typename Key, typename Mapped, typename Allocator = 
HashTableAllocator>
-using FixedImplicitZeroHashMapWithCalculatedSize =
-        FixedHashMap<Key, Mapped, FixedHashMapImplicitZeroCell<Key, Mapped>,
-                     
FixedHashTableCalculatedSize<FixedHashMapImplicitZeroCell<Key, Mapped>>,
-                     Allocator>;
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/fixed_hash_table.h 
b/be/src/vec/common/hash_table/fixed_hash_table.h
deleted file mode 100644
index a0e901d6c3c..00000000000
--- a/be/src/vec/common/hash_table/fixed_hash_table.h
+++ /dev/null
@@ -1,372 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/FixedHashTable.h
-// and modified by Doris
-
-#pragma once
-
-#include "vec/common/hash_table/hash_table.h"
-
-/// How to obtain the size of the table.
-
-template <typename Cell>
-struct FixedHashTableStoredSize {
-    size_t m_size = 0;
-
-    size_t get_size(const Cell*, const typename Cell::State&, size_t) const { 
return m_size; }
-    bool is_empty(const Cell*, const typename Cell::State&, size_t) const { 
return m_size == 0; }
-
-    void increase_size() { ++m_size; }
-    void clear_size() { m_size = 0; }
-    void set_size(size_t to) { m_size = to; }
-};
-
-template <typename Cell>
-struct FixedHashTableCalculatedSize {
-    size_t get_size(const Cell* buf, const typename Cell::State& state, size_t 
num_cells) const {
-        size_t res = 0;
-        for (const Cell* end = buf + num_cells; buf != end; ++buf)
-            if (!buf->is_zero(state)) ++res;
-        return res;
-    }
-
-    bool isEmpty(const Cell* buf, const typename Cell::State& state, size_t 
num_cells) const {
-        for (const Cell* end = buf + num_cells; buf != end; ++buf)
-            if (!buf->is_zero(state)) return false;
-        return true;
-    }
-
-    void increase_size() {}
-    void clear_size() {}
-    void set_size(size_t) {}
-};
-
-/** Used as a lookup table for small keys such as UInt8, UInt16. It's different
-  *  than a HashTable in that keys are not stored in the Cell buf, but inferred
-  *  inside each iterator. There are a bunch of to make it faster than using
-  *  HashTable: a) It doesn't have a conflict chain; b) There is no key
-  *  comparison; c) The number of cycles for checking cell empty is halved; d)
-  *  Memory layout is tighter, especially the Clearable variants.
-  *
-  * NOTE: For Set variants this should always be better. For Map variants
-  *  however, as we need to assemble the real cell inside each iterator, there
-  *  might be some cases we fall short.
-  *
-  * TODO: Deprecate the cell API so that end users don't rely on the structure
-  *  of cell. Instead iterator should be used for operations such as cell
-  *  transfer, key updates (f.g. StringRef) and serde. This will allow
-  *  TwoLevelHashSet(Map) to contain different type of sets(maps).
-  */
-template <typename Key, typename Cell, typename Size, typename Allocator>
-class FixedHashTable : private boost::noncopyable,
-                       protected Allocator,
-                       protected Cell::State,
-                       protected Size {
-    static constexpr size_t NUM_CELLS = 1ULL << (sizeof(Key) * 8);
-
-protected:
-    using Self = FixedHashTable;
-
-    Cell* buf; /// A piece of memory for all elements.
-
-    void alloc() { buf = reinterpret_cast<Cell*>(Allocator::alloc(NUM_CELLS * 
sizeof(Cell))); }
-
-    void free() {
-        if (buf) {
-            Allocator::free(buf, get_buffer_size_in_bytes());
-            buf = nullptr;
-        }
-    }
-
-    void destroy_elements() {
-        if (!std::is_trivially_destructible_v<Cell>)
-            for (iterator it = begin(), it_end = end(); it != it_end; ++it) 
it.ptr->~Cell();
-    }
-
-    template <typename Derived, bool is_const>
-    class iterator_base {
-        using Container = std::conditional_t<is_const, const Self, Self>;
-        using cell_type = std::conditional_t<is_const, const Cell, Cell>;
-
-        Container* container;
-        cell_type* ptr;
-
-        friend class FixedHashTable;
-
-    public:
-        iterator_base() {}
-        iterator_base(Container* container_, cell_type* ptr_) : 
container(container_), ptr(ptr_) {
-            cell.update(ptr - container->buf, ptr);
-        }
-
-        bool operator==(const iterator_base& rhs) const { return ptr == 
rhs.ptr; }
-        bool operator!=(const iterator_base& rhs) const { return ptr != 
rhs.ptr; }
-
-        Derived& operator++() {
-            ++ptr;
-
-            /// Skip empty cells in the main buffer.
-            auto buf_end = container->buf + container->NUM_CELLS;
-            while (ptr < buf_end && ptr->is_zero(*container)) ++ptr;
-
-            return static_cast<Derived&>(*this);
-        }
-
-        auto& operator*() {
-            if (cell.key != ptr - container->buf) cell.update(ptr - 
container->buf, ptr);
-            return cell;
-        }
-        auto* operator->() {
-            if (cell.key != ptr - container->buf) cell.update(ptr - 
container->buf, ptr);
-            return &cell;
-        }
-
-        auto get_ptr() const { return ptr; }
-        size_t get_hash() const { return ptr - container->buf; }
-        size_t get_collision_chain_length() const { return 0; }
-        typename cell_type::CellExt cell;
-    };
-
-public:
-    using key_type = Key;
-    using mapped_type = typename Cell::mapped_type;
-    using value_type = typename Cell::value_type;
-    using cell_type = Cell;
-
-    using LookupResult = Cell*;
-    using ConstLookupResult = const Cell*;
-
-    size_t hash(const Key& x) const { return x; }
-
-    FixedHashTable() { alloc(); }
-
-    FixedHashTable(FixedHashTable&& rhs) : buf(nullptr) { *this = 
std::move(rhs); }
-
-    ~FixedHashTable() {
-        destroy_elements();
-        free();
-    }
-
-    FixedHashTable& operator=(FixedHashTable&& rhs) {
-        destroy_elements();
-        free();
-
-        const auto new_size = rhs.size();
-        std::swap(buf, rhs.buf);
-        this->set_size(new_size);
-
-        Allocator::operator=(std::move(rhs));
-        Cell::State::operator=(std::move(rhs));
-
-        return *this;
-    }
-
-    class iterator : public iterator_base<iterator, false> {
-    public:
-        using iterator_base<iterator, false>::iterator_base;
-    };
-
-    class const_iterator : public iterator_base<const_iterator, true> {
-    public:
-        using iterator_base<const_iterator, true>::iterator_base;
-    };
-
-    const_iterator begin() const {
-        if (!buf) return end();
-
-        const Cell* ptr = buf;
-        auto buf_end = buf + NUM_CELLS;
-        while (ptr < buf_end && ptr->is_zero(*this)) ++ptr;
-
-        return const_iterator(this, ptr);
-    }
-
-    const_iterator cbegin() const { return begin(); }
-
-    iterator begin() {
-        if (!buf) return end();
-
-        Cell* ptr = buf;
-        auto buf_end = buf + NUM_CELLS;
-        while (ptr < buf_end && ptr->is_zero(*this)) ++ptr;
-
-        return iterator(this, ptr);
-    }
-
-    const_iterator end() const {
-        /// Avoid UBSan warning about adding zero to nullptr. It is valid in 
C++20 (and earlier) but not valid in C.
-        return const_iterator(this, buf ? buf + NUM_CELLS : buf);
-    }
-
-    const_iterator cend() const { return end(); }
-
-    iterator end() { return iterator(this, buf ? buf + NUM_CELLS : buf); }
-
-public:
-    /// The last parameter is unused but exists for compatibility with 
HashTable interface.
-    void ALWAYS_INLINE emplace(const Key& x, LookupResult& it, bool& inserted,
-                               size_t /* hash */ = 0) {
-        it = &buf[x];
-
-        if (!buf[x].is_zero(*this)) {
-            inserted = false;
-            return;
-        }
-
-        new (&buf[x]) Cell(x, *this);
-        inserted = true;
-        this->increase_size();
-    }
-
-    class Constructor {
-    public:
-        friend class FixedHashTable;
-        template <typename... Args>
-        void operator()(Args&&... args) const {
-            new (_cell) Cell(std::forward<Args>(args)...);
-        }
-
-    private:
-        Constructor(Cell* cell) : _cell(cell) {}
-        Cell* _cell;
-    };
-
-    template <typename Func>
-    void ALWAYS_INLINE lazy_emplace(const Key& x, LookupResult& it, Func&& f) {
-        it = &buf[x];
-
-        if (!buf[x].is_zero(*this)) {
-            return;
-        }
-
-        f(Constructor(&buf[x]), x, x);
-        this->increase_size();
-    }
-
-    template <typename Func>
-    void ALWAYS_INLINE lazy_emplace(const Key& x, LookupResult& it, size_t 
hash_value, Func&& f) {
-        lazy_emplace(x, it, std::forward<Func>(f));
-    }
-
-    template <bool READ>
-    void ALWAYS_INLINE prefetch(const Key& key, size_t hash_value) {
-        // Two optional arguments:
-        // 'rw': 1 means the memory access is write
-        // 'locality': 0-3. 0 means no temporal locality. 3 means high 
temporal locality.
-        __builtin_prefetch(&buf[hash_value], READ ? 0 : 1, 1);
-    }
-
-    std::pair<LookupResult, bool> ALWAYS_INLINE insert(const value_type& x) {
-        std::pair<LookupResult, bool> res;
-        emplace(Cell::get_key(x), res.first, res.second);
-        if (res.second) insert_set_mapped(res.first->get_mapped(), x);
-
-        return res;
-    }
-
-    LookupResult ALWAYS_INLINE find(const Key& x) {
-        return !buf[x].is_zero(*this) ? &buf[x] : nullptr;
-    }
-
-    ConstLookupResult ALWAYS_INLINE find(const Key& x) const {
-        return const_cast<std::decay_t<decltype(*this)>*>(this)->find(x);
-    }
-
-    LookupResult ALWAYS_INLINE find(const Key&, size_t hash_value) {
-        return !buf[hash_value].is_zero(*this) ? &buf[hash_value] : nullptr;
-    }
-
-    ConstLookupResult ALWAYS_INLINE find(const Key& key, size_t hash_value) 
const {
-        return const_cast<std::decay_t<decltype(*this)>*>(this)->find(key, 
hash_value);
-    }
-
-    bool ALWAYS_INLINE has(const Key& x) const { return 
!buf[x].is_zero(*this); }
-    bool ALWAYS_INLINE has(const Key&, size_t hash_value) const {
-        return !buf[hash_value].is_zero(*this);
-    }
-
-    void write(doris::vectorized::BufferWritable& wb) const {
-        Cell::State::write(wb);
-        doris::vectorized::write_var_uint(size(), wb);
-
-        if (!buf) return;
-
-        for (auto ptr = buf, buf_end = buf + NUM_CELLS; ptr < buf_end; ++ptr) {
-            if (!ptr->is_zero(*this)) {
-                doris::vectorized::write_var_uint(ptr - buf, wb);
-                ptr->write(wb);
-            }
-        }
-    }
-
-    void read(doris::vectorized::BufferReadable& rb) {
-        Cell::State::read(rb);
-        destroy_elements();
-        doris::vectorized::UInt64 m_size;
-        doris::vectorized::read_var_uint(m_size, rb);
-        this->set_size(m_size);
-        free();
-        alloc();
-
-        for (size_t i = 0; i < m_size; ++i) {
-            doris::vectorized::UInt64 place_value = 0;
-            doris::vectorized::read_var_uint(place_value, rb);
-            Cell x;
-            x.read(rb);
-            new (&buf[place_value]) Cell(x, *this);
-        }
-    }
-
-    size_t size() const { return this->get_size(buf, *this, NUM_CELLS); }
-    bool empty() const { return this->is_empty(buf, *this, NUM_CELLS); }
-
-    void clear() {
-        destroy_elements();
-        this->clear_size();
-
-        memset(static_cast<void*>(buf), 0, NUM_CELLS * sizeof(*buf));
-    }
-
-    /// After executing this function, the table can only be destroyed,
-    ///  and also you can use the methods `size`, `empty`, `begin`, `end`.
-    void clear_and_shrink() {
-        destroy_elements();
-        this->clear_size();
-        free();
-    }
-
-    size_t get_buffer_size_in_bytes() const { return NUM_CELLS * sizeof(Cell); 
}
-
-    size_t get_buffer_size_in_cells() const { return NUM_CELLS; }
-
-    /// Return offset for result in internal buffer.
-    /// Result can have value up to `getBufferSizeInCells() + 1`
-    /// because offset for zero value considered to be 0
-    /// and for other values it will be `offset in buffer + 1`
-    size_t offset_internal(ConstLookupResult ptr) const {
-        if (ptr->is_zero(*this)) return 0;
-        return ptr - buf + 1;
-    }
-
-    const Cell* data() const { return buf; }
-    Cell* data() { return buf; }
-
-#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
-    size_t get_collisions() const { return 0; }
-#endif
-};
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 34250e6bd4f..0d2ad598140 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <type_traits>
+#include <utility>
 
 #include "runtime/descriptors.h"
 #include "util/stack_util.h"
@@ -33,6 +34,8 @@
 
 namespace doris::vectorized {
 
+constexpr auto BITSIZE = 8;
+
 template <typename Base>
 struct DataWithNullKey;
 
@@ -42,6 +45,7 @@ struct MethodBase {
     using Mapped = typename HashMap::mapped_type;
     using Value = typename HashMap::value_type;
     using Iterator = typename HashMap::iterator;
+    using HashMapType = HashMap;
 
     std::shared_ptr<HashMap> hash_table;
     Iterator iterator;
@@ -64,8 +68,8 @@ struct MethodBase {
             iterator = hash_table->begin();
         }
     }
-    virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, const 
Sizes& key_sizes,
-                                      size_t num_rows, const uint8_t* null_map 
= nullptr) = 0;
+    virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                                      const uint8_t* null_map = nullptr) = 0;
 
     void init_hash_values(size_t num_rows, const uint8_t* null_map) {
         if (null_map == nullptr) {
@@ -127,7 +131,7 @@ struct MethodBase {
     }
 
     virtual void insert_keys_into_columns(std::vector<Key>& keys, 
MutableColumns& key_columns,
-                                          const size_t num_rows, const Sizes&) 
= 0;
+                                          size_t num_rows) = 0;
 };
 
 template <typename TData>
@@ -151,8 +155,8 @@ struct MethodSerialized : public MethodBase<TData> {
         return {begin, sum_size};
     }
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& 
key_sizes,
-                              size_t num_rows, const uint8_t* null_map = 
nullptr) override {
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr) override {
         Base::arena.clear();
         stored_keys.resize(num_rows);
 
@@ -170,7 +174,7 @@ struct MethodSerialized : public MethodBase<TData> {
                         serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, Base::arena);
             }
         } else {
-            uint8_t* serialized_key_buffer =
+            auto* serialized_key_buffer =
                     reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes));
 
             for (size_t i = 0; i < num_rows; ++i) {
@@ -188,7 +192,7 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
-                                  const size_t num_rows, const Sizes&) 
override {
+                                  const size_t num_rows) override {
         for (auto& column : key_columns) {
             column->deserialize_vec(keys, num_rows);
         }
@@ -196,7 +200,7 @@ struct MethodSerialized : public MethodBase<TData> {
 };
 
 inline size_t get_bitmap_size(size_t key_number) {
-    return (key_number + 7) / 8;
+    return (key_number + BITSIZE - 1) / BITSIZE;
 }
 
 template <typename TData>
@@ -209,15 +213,15 @@ struct MethodStringNoCache : public MethodBase<TData> {
 
     std::vector<StringRef> stored_keys;
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& 
key_sizes,
-                              size_t num_rows, const uint8_t* null_map = 
nullptr) override {
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr) override {
         const IColumn& column = *key_columns[0];
-        const ColumnString& column_string = assert_cast<const ColumnString&>(
+        const auto& column_string = assert_cast<const ColumnString&>(
                 column.is_nullable()
                         ? assert_cast<const 
ColumnNullable&>(column).get_nested_column()
                         : column);
-        auto offsets = column_string.get_offsets().data();
-        auto chars = column_string.get_chars().data();
+        const auto* offsets = column_string.get_offsets().data();
+        const auto* chars = column_string.get_chars().data();
 
         stored_keys.resize(column_string.size());
         for (size_t row = 0; row < column_string.size(); row++) {
@@ -229,7 +233,7 @@ struct MethodStringNoCache : public MethodBase<TData> {
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
-                                  const size_t num_rows, const Sizes&) 
override {
+                                  const size_t num_rows) override {
         key_columns[0]->reserve(num_rows);
         key_columns[0]->insert_many_strings(keys.data(), num_rows);
     }
@@ -245,8 +249,8 @@ struct MethodOneNumber : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodOneNumber<typename Base::Value, 
typename Base::Mapped,
                                                       FieldType>;
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& 
key_sizes,
-                              size_t num_rows, const uint8_t* null_map = 
nullptr) override {
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr) override {
         Base::keys = (FieldType*)(key_columns[0]->is_nullable()
                                           ? assert_cast<const 
ColumnNullable*>(key_columns[0])
                                                     ->get_nested_column_ptr()
@@ -258,8 +262,7 @@ struct MethodOneNumber : public MethodBase<TData> {
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
-                                  MutableColumns& key_columns, const size_t 
num_rows,
-                                  const Sizes&) override {
+                                  MutableColumns& key_columns, const size_t 
num_rows) override {
         key_columns[0]->reserve(num_rows);
         auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get());
         for (size_t i = 0; i != num_rows; ++i) {
@@ -282,10 +285,13 @@ struct MethodKeysFixed : public MethodBase<TData> {
                                                       has_nullable_keys>;
 
     std::vector<Key> stored_keys;
+    Sizes key_sizes;
+
+    MethodKeysFixed(Sizes key_sizes_) : key_sizes(std::move(key_sizes_)) {}
 
     template <typename T>
     std::vector<T> pack_fixeds(size_t row_numbers, const ColumnRawPtrs& 
key_columns,
-                               const Sizes& key_sizes, const ColumnRawPtrs& 
nullmap_columns) {
+                               const ColumnRawPtrs& nullmap_columns) {
         size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
 
         std::vector<T> result(row_numbers);
@@ -295,8 +301,8 @@ struct MethodKeysFixed : public MethodBase<TData> {
                 if (!nullmap_columns[j]) {
                     continue;
                 }
-                size_t bucket = j / 8;
-                size_t offset = j % 8;
+                size_t bucket = j / BITSIZE;
+                size_t offset = j % BITSIZE;
                 const auto& data =
                         assert_cast<const 
ColumnUInt8&>(*nullmap_columns[j]).get_data().data();
                 for (size_t i = 0; i < row_numbers; ++i) {
@@ -311,7 +317,7 @@ struct MethodKeysFixed : public MethodBase<TData> {
 
             auto foo = [&]<typename Fixed>(Fixed zero) {
                 CHECK_EQ(sizeof(Fixed), key_sizes[j]);
-                if (nullmap_columns.size() && nullmap_columns[j]) {
+                if (!nullmap_columns.empty() && nullmap_columns[j]) {
                     const auto& nullmap =
                             assert_cast<const 
ColumnUInt8&>(*nullmap_columns[j]).get_data().data();
                     for (size_t i = 0; i < row_numbers; ++i) {
@@ -326,15 +332,15 @@ struct MethodKeysFixed : public MethodBase<TData> {
                 }
             };
 
-            if (key_sizes[j] == 1) {
-                foo(int8_t());
-            } else if (key_sizes[j] == 2) {
-                foo(int16_t());
-            } else if (key_sizes[j] == 4) {
-                foo(int32_t());
-            } else if (key_sizes[j] == 8) {
-                foo(int64_t());
-            } else if (key_sizes[j] == 16) {
+            if (key_sizes[j] == sizeof(uint8_t)) {
+                foo(uint8_t());
+            } else if (key_sizes[j] == sizeof(uint16_t)) {
+                foo(uint16_t());
+            } else if (key_sizes[j] == sizeof(uint32_t)) {
+                foo(uint32_t());
+            } else if (key_sizes[j] == sizeof(uint64_t)) {
+                foo(uint64_t());
+            } else if (key_sizes[j] == sizeof(UInt128)) {
                 foo(UInt128());
             } else {
                 throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -345,15 +351,15 @@ struct MethodKeysFixed : public MethodBase<TData> {
         return result;
     }
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, const Sizes& 
key_sizes,
-                              size_t num_rows, const uint8_t* null_map = 
nullptr) override {
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr) override {
         ColumnRawPtrs actual_columns;
         ColumnRawPtrs null_maps;
         if (has_nullable_keys) {
             actual_columns.reserve(key_columns.size());
             null_maps.reserve(key_columns.size());
             for (const auto& col : key_columns) {
-                if (auto* nullable_col = 
check_and_get_column<ColumnNullable>(col)) {
+                if (const auto* nullable_col = 
check_and_get_column<ColumnNullable>(col)) {
                     
actual_columns.push_back(&nullable_col->get_nested_column());
                     null_maps.push_back(&nullable_col->get_null_map_column());
                 } else {
@@ -364,14 +370,13 @@ struct MethodKeysFixed : public MethodBase<TData> {
         } else {
             actual_columns = key_columns;
         }
-        stored_keys = pack_fixeds<Key>(num_rows, actual_columns, key_sizes, 
null_maps);
+        stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps);
         Base::keys = stored_keys.data();
         Base::init_hash_values(num_rows, null_map);
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
-                                  MutableColumns& key_columns, const size_t 
num_rows,
-                                  const Sizes& key_sizes) override {
+                                  MutableColumns& key_columns, const size_t 
num_rows) override {
         // In any hash key value, column values to be read start just after 
the bitmap, if it exists.
         size_t pos = has_nullable_keys ? get_bitmap_size(key_columns.size()) : 
0;
 
@@ -381,7 +386,7 @@ struct MethodKeysFixed : public MethodBase<TData> {
             key_columns[i]->resize(num_rows);
             // If we have a nullable column, get its nested column and its 
null map.
             if (is_column_nullable(*key_columns[i])) {
-                ColumnNullable& nullable_col = 
assert_cast<ColumnNullable&>(*key_columns[i]);
+                auto& nullable_col = 
assert_cast<ColumnNullable&>(*key_columns[i]);
 
                 data = 
const_cast<char*>(nullable_col.get_nested_column().get_raw_data().data);
                 UInt8* nullmap = 
assert_cast<ColumnUInt8*>(&nullable_col.get_null_map_column())
@@ -390,8 +395,8 @@ struct MethodKeysFixed : public MethodBase<TData> {
 
                 // The current column is nullable. Check if the value of the
                 // corresponding key is nullable. Update the null map 
accordingly.
-                size_t bucket = i / 8;
-                size_t offset = i % 8;
+                size_t bucket = i / BITSIZE;
+                size_t offset = i % BITSIZE;
                 for (size_t j = 0; j < num_rows; j++) {
                     nullmap[j] = (reinterpret_cast<const 
UInt8*>(&keys[j])[bucket] >> offset) & 1;
                 }
@@ -406,15 +411,15 @@ struct MethodKeysFixed : public MethodBase<TData> {
                 }
             };
 
-            if (size == 1) {
-                foo(int8_t());
-            } else if (size == 2) {
-                foo(int16_t());
-            } else if (size == 4) {
-                foo(int32_t());
-            } else if (size == 8) {
-                foo(int64_t());
-            } else if (size == 16) {
+            if (size == sizeof(uint8_t)) {
+                foo(uint8_t());
+            } else if (size == sizeof(uint16_t)) {
+                foo(uint16_t());
+            } else if (size == sizeof(uint32_t)) {
+                foo(uint32_t());
+            } else if (size == sizeof(uint64_t)) {
+                foo(uint64_t());
+            } else if (size == sizeof(UInt128)) {
                 foo(UInt128());
             } else {
                 throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -461,9 +466,8 @@ struct MethodSingleNullableColumn : public 
SingleColumnMethod {
                                                                     typename 
Base::Mapped>;
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
-                                  MutableColumns& key_columns, const size_t 
num_rows,
-                                  const Sizes&) override {
-        auto col = key_columns[0].get();
+                                  MutableColumns& key_columns, const size_t 
num_rows) override {
+        auto* col = key_columns[0].get();
         col->reserve(num_rows);
         if constexpr (std::is_same_v<typename Base::Key, StringRef>) {
             col->insert_many_strings(keys.data(), num_rows);
diff --git a/be/src/vec/common/hash_table/hash_map_context_creator.h 
b/be/src/vec/common/hash_table/hash_map_context_creator.h
new file mode 100644
index 00000000000..60376acea5e
--- /dev/null
+++ b/be/src/vec/common/hash_table/hash_map_context_creator.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/common/hash_table/hash_map_context.h"
+#include "vec/common/hash_table/ph_hash_map.h"
+
+namespace doris::vectorized {
+
+template <template <typename, typename, typename> typename HashMap,
+          template <typename> typename Hash, typename Key, typename Value, 
typename Variant>
+void get_hash_map_context_fixed(Variant& variant, bool has_nullable_key, const 
Sizes& key_sizes) {
+    if (has_nullable_key) {
+        variant.template emplace<MethodKeysFixed<HashMap<Key, Value, 
Hash<Key>>, true>>(key_sizes);
+    } else {
+        variant.template emplace<MethodKeysFixed<HashMap<Key, Value, 
Hash<Key>>>>(key_sizes);
+    }
+}
+
+template <template <typename, typename, typename> typename HashMap,
+          template <typename> typename Hash, typename Value, typename Variant>
+void get_hash_map_context_fixed(Variant& variant, size_t size, bool 
has_nullable_key,
+                                const Sizes& key_sizes) {
+    if (size <= sizeof(UInt64)) {
+        get_hash_map_context_fixed<HashMap, Hash, UInt64, Value>(variant, 
has_nullable_key,
+                                                                 key_sizes);
+    } else if (size <= sizeof(UInt128)) {
+        get_hash_map_context_fixed<HashMap, Hash, UInt128, Value>(variant, 
has_nullable_key,
+                                                                  key_sizes);
+    } else if (size <= sizeof(UInt136)) {
+        get_hash_map_context_fixed<HashMap, Hash, UInt136, Value>(variant, 
has_nullable_key,
+                                                                  key_sizes);
+    } else if (size <= sizeof(UInt256)) {
+        get_hash_map_context_fixed<HashMap, Hash, UInt256, Value>(variant, 
has_nullable_key,
+                                                                  key_sizes);
+    }
+}
+
+template <template <typename, typename, typename> typename HashMap,
+          template <typename> typename Hash, typename Value, typename Variant>
+bool try_get_hash_map_context_fixed(Variant& variant, const VExprContextSPtrs& 
expr_ctxs) {
+    Sizes key_sizes;
+
+    bool use_fixed_key = true;
+    bool has_null = false;
+    size_t key_byte_size = 0;
+
+    for (auto ctx : expr_ctxs) {
+        const auto& data_type = ctx->root()->data_type();
+        if (!data_type->have_maximum_size_of_value()) {
+            use_fixed_key = false;
+            break;
+        }
+        has_null |= data_type->is_nullable();
+        
key_sizes.emplace_back(data_type->get_maximum_size_of_value_in_memory() -
+                               (data_type->is_nullable() ? 1 : 0));
+        key_byte_size += key_sizes.back();
+    }
+
+    size_t bitmap_size = has_null ? get_bitmap_size(expr_ctxs.size()) : 0;
+    if (bitmap_size + key_byte_size > sizeof(UInt256)) {
+        use_fixed_key = false;
+    }
+
+    if (use_fixed_key) {
+        get_hash_map_context_fixed<HashMap, Hash, Value>(variant, bitmap_size 
+ key_byte_size,
+                                                         has_null, key_sizes);
+    }
+    return use_fixed_key;
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_map_util.h 
b/be/src/vec/common/hash_table/hash_map_util.h
index 7d480266992..2dc725f1f27 100644
--- a/be/src/vec/common/hash_table/hash_map_util.h
+++ b/be/src/vec/common/hash_table/hash_map_util.h
@@ -33,15 +33,7 @@ enum class HashKeyType {
     int64_key_phase2,
     int128_key,
     int128_key_phase2,
-    int64_keys,
-    int64_keys_phase2,
-    int128_keys,
-    int128_keys_phase2,
-    int256_keys,
-    int256_keys_phase2,
     string_key,
-    int136_keys,
-    int136_keys_phase2,
 };
 
 inline HashKeyType get_hash_key_type_with_phase(HashKeyType t, bool phase2) {
@@ -54,15 +46,6 @@ inline HashKeyType get_hash_key_type_with_phase(HashKeyType 
t, bool phase2) {
     if (t == HashKeyType::int64_key) {
         return HashKeyType::int64_key_phase2;
     }
-    if (t == HashKeyType::int128_keys) {
-        return HashKeyType::int128_keys_phase2;
-    }
-    if (t == HashKeyType::int136_keys) {
-        return HashKeyType::int136_keys_phase2;
-    }
-    if (t == HashKeyType::int256_keys) {
-        return HashKeyType::int256_keys_phase2;
-    }
     return t;
 }
 
@@ -87,10 +70,5 @@ struct DataVariants {
             method_variant.template emplace<MethodOneNumber<T, TT>>();
         }
     }
-
-    template <typename T, bool nullable>
-    void emplace_fixed() {
-        method_variant.template emplace<MethodFixed<T, nullable>>();
-    }
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 47c9d62e01f..39ee5ec9e0a 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -59,12 +59,12 @@ namespace ZeroTraits {
 
 template <typename T>
 bool check(const T x) {
-    return x == 0;
+    return x == T {};
 }
 
 template <typename T>
 void set(T& x) {
-    x = 0;
+    x = T {};
 }
 
 } // namespace ZeroTraits
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index 8afa52b4b16..ff1fec3ab1c 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -42,8 +42,8 @@ struct HashTableBuild {
             _operation_node->_mem_used += bucket_bytes - old_bucket_bytes;
         }};
 
-        KeyGetter key_getter(_build_raw_ptrs, _operation_node->_build_key_sz);
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, 
_operation_node->_build_key_sz, _rows);
+        KeyGetter key_getter(_build_raw_ptrs);
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows);
 
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
@@ -85,9 +85,8 @@ struct HashTableBuildX {
             local_state._shared_state->mem_used += bucket_bytes - 
old_bucket_bytes;
         }};
 
-        KeyGetter key_getter(_build_raw_ptrs, 
local_state._shared_state->build_key_sz);
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs,
-                                            
local_state._shared_state->build_key_sz, _rows);
+        KeyGetter key_getter(_build_raw_ptrs);
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows);
 
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h 
b/be/src/vec/common/hash_table/hash_table_set_probe.h
index b9962c58d93..4a79b86a146 100644
--- a/be/src/vec/common/hash_table/hash_table_set_probe.h
+++ b/be/src/vec/common/hash_table/hash_table_set_probe.h
@@ -32,9 +32,8 @@ struct HashTableProbe {
     Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
         using KeyGetter = typename HashTableContext::State;
 
-        KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz);
-        hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, 
_operation_node->_probe_key_sz,
-                                            _probe_rows);
+        KeyGetter key_getter(_probe_raw_ptrs);
+        hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows);
 
         if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
             for (int probe_index = 0; probe_index < _probe_rows; 
probe_index++) {
@@ -73,9 +72,8 @@ struct HashTableProbeX {
                                   HashTableContext& hash_table_ctx) {
         using KeyGetter = typename HashTableContext::State;
 
-        KeyGetter key_getter(_probe_raw_ptrs, 
local_state._shared_state->probe_key_sz);
-        hash_table_ctx.init_serialized_keys(_probe_raw_ptrs,
-                                            
local_state._shared_state->probe_key_sz, _probe_rows);
+        KeyGetter key_getter(_probe_raw_ptrs);
+        hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows);
 
         if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
             for (int probe_index = 0; probe_index < _probe_rows; 
probe_index++) {
diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h 
b/be/src/vec/common/hash_table/partitioned_hash_map.h
index f667360592d..89958608c73 100644
--- a/be/src/vec/common/hash_table/partitioned_hash_map.h
+++ b/be/src/vec/common/hash_table/partitioned_hash_map.h
@@ -58,6 +58,9 @@ using PartitionedHashMap =
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
 using PHPartitionedHashMap = PartitionedHashMapTable<PHHashMap<Key, Mapped, 
Hash, true>>;
 
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+using PHNormalHashMap = PHHashMap<Key, Mapped, Hash, false>;
+
 template <typename Key, typename Mapped, typename Hash>
 struct HashTableTraits<PHPartitionedHashMap<Key, Mapped, Hash>> {
     static constexpr bool is_phmap = true;
diff --git a/be/src/vec/common/hash_table/string_hash_table.h 
b/be/src/vec/common/hash_table/string_hash_table.h
index 2c51760e6bf..00bbf2f02f8 100644
--- a/be/src/vec/common/hash_table/string_hash_table.h
+++ b/be/src/vec/common/hash_table/string_hash_table.h
@@ -669,8 +669,4 @@ public:
         return m1.add_elem_size_overflow(add_size) || 
m2.add_elem_size_overflow(add_size) ||
                m3.add_elem_size_overflow(add_size) || 
ms.add_elem_size_overflow(add_size);
     }
-
-#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
-    size_t get_collisions() const { return 0; }
-#endif
 };
diff --git a/be/src/vec/common/uint128.h b/be/src/vec/common/uint128.h
index 597ad2f7756..523e74ab740 100644
--- a/be/src/vec/common/uint128.h
+++ b/be/src/vec/common/uint128.h
@@ -123,7 +123,7 @@ struct UInt128Hash {
 #if defined(__SSE4_2__) || defined(__aarch64__)
 
 struct UInt128HashCRC32 {
-    size_t operator()(UInt128 x) const {
+    size_t operator()(const UInt128& x) const {
         UInt64 crc = -1ULL;
         crc = _mm_crc32_u64(crc, x.low);
         crc = _mm_crc32_u64(crc, x.high);
diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp 
b/be/src/vec/exec/distinct_vaggregation_node.cpp
index b12281da2ba..e2c06cc7234 100644
--- a/be/src/vec/exec/distinct_vaggregation_node.cpp
+++ b/be/src/vec/exec/distinct_vaggregation_node.cpp
@@ -87,8 +87,8 @@ void 
DistinctAggregationNode::_emplace_into_hash_table_to_distinct(IColumn::Sele
                 SCOPED_TIMER(_hash_table_compute_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, _probe_key_sz, 
num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 size_t row = 0;
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index e20e7c83189..435cea84186 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -121,7 +121,6 @@ struct ProcessHashTableProbe {
 
     bool _have_other_join_conjunct;
     bool _is_right_semi_anti;
-    Sizes _probe_key_sz;
     std::vector<bool>* _left_output_slot_flags;
     std::vector<bool>* _right_output_slot_flags;
     bool* _has_null_in_build_side;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 16ef76d02c5..f4a3010c49f 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -45,7 +45,6 @@ ProcessHashTableProbe<JoinOpType, 
Parent>::ProcessHashTableProbe(Parent* parent,
                                              : nullptr),
           _have_other_join_conjunct(parent->have_other_join_conjunct()),
           _is_right_semi_anti(parent->is_right_semi_anti()),
-          _probe_key_sz(parent->probe_key_sz()),
           _left_output_slot_flags(parent->left_output_slot_flags()),
           _right_output_slot_flags(parent->right_output_slot_flags()),
           _has_null_in_build_side(parent->has_null_in_build_side()),
@@ -184,10 +183,9 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
         hash_table_ctx.reset();
-        hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
_probe_key_sz, probe_rows,
-                                            null_map);
+        hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map);
     }
-    return typename HashTableType::State(_parent->_probe_columns, 
_probe_key_sz);
+    return typename HashTableType::State(_parent->_probe_columns);
 }
 
 template <int JoinOpType, typename Parent>
@@ -1011,6 +1009,8 @@ struct ExtractType<T(U)> {
     INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefList>));          \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefList>));           \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefList>));          \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<true, 
RowRefList>));           \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<false, 
RowRefList>));          \
     INSTANTIATION(JoinOpType, Parent, 
(SerializedHashTableContext<RowRefListWithFlag>));           \
     INSTANTIATION(JoinOpType, Parent, 
(I8HashTableContext<RowRefListWithFlag>));                   \
     INSTANTIATION(JoinOpType, Parent, 
(I16HashTableContext<RowRefListWithFlag>));                  \
@@ -1024,6 +1024,8 @@ struct ExtractType<T(U)> {
     INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
     INSTANTIATION(JoinOpType, Parent, 
(SerializedHashTableContext<RowRefListWithFlags>));          \
     INSTANTIATION(JoinOpType, Parent, 
(I8HashTableContext<RowRefListWithFlags>));                  \
     INSTANTIATION(JoinOpType, Parent, 
(I16HashTableContext<RowRefListWithFlags>));                 \
@@ -1035,6 +1037,8 @@ struct ExtractType<T(U)> {
     INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>));  \
     INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
     INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>)); \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
+    INSTANTIATION(JoinOpType, Parent, (I136FixedKeyHashTableContext<false, 
RowRefListWithFlags>)); \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
     INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>))
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index f48a7e278fe..af7336a1f5e 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1082,61 +1082,8 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) 
{
                     return;
                 }
 
-                bool use_fixed_key = true;
-                bool has_null = false;
-                size_t key_byte_size = 0;
-                size_t bitmap_size = get_bitmap_size(_build_expr_ctxs.size());
-
-                _probe_key_sz.resize(_probe_expr_ctxs.size());
-                _build_key_sz.resize(_build_expr_ctxs.size());
-
-                for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
-                    const auto vexpr = _build_expr_ctxs[i]->root();
-                    const auto& data_type = vexpr->data_type();
-
-                    if (!data_type->have_maximum_size_of_value()) {
-                        use_fixed_key = false;
-                        break;
-                    }
-
-                    auto is_null = data_type->is_nullable();
-                    has_null |= is_null;
-                    _build_key_sz[i] =
-                            data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
-                    _probe_key_sz[i] = _build_key_sz[i];
-                    key_byte_size += _probe_key_sz[i];
-                }
-
-                if (bitmap_size + key_byte_size > sizeof(UInt256)) {
-                    use_fixed_key = false;
-                }
-
-                if (use_fixed_key) {
-                    // TODO: may we should support uint256 in the future
-                    if (has_null) {
-                        if (bitmap_size + key_byte_size <= sizeof(UInt64)) {
-                            _hash_table_variants
-                                    
->emplace<I64FixedKeyHashTableContext<true, RowRefListType>>();
-                        } else if (bitmap_size + key_byte_size <= 
sizeof(UInt128)) {
-                            _hash_table_variants
-                                    
->emplace<I128FixedKeyHashTableContext<true, RowRefListType>>();
-                        } else {
-                            _hash_table_variants
-                                    
->emplace<I256FixedKeyHashTableContext<true, RowRefListType>>();
-                        }
-                    } else {
-                        if (key_byte_size <= sizeof(UInt64)) {
-                            _hash_table_variants
-                                    
->emplace<I64FixedKeyHashTableContext<false, RowRefListType>>();
-                        } else if (key_byte_size <= sizeof(UInt128)) {
-                            _hash_table_variants->emplace<
-                                    I128FixedKeyHashTableContext<false, 
RowRefListType>>();
-                        } else {
-                            _hash_table_variants->emplace<
-                                    I256FixedKeyHashTableContext<false, 
RowRefListType>>();
-                        }
-                    }
-                } else {
+                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                            *_hash_table_variants, _build_expr_ctxs)) {
                     
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
                 }
             },
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index b9264bc1457..50e1567f4e2 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -39,6 +39,7 @@
 #include "vec/common/arena.h"
 #include "vec/common/columns_hashing.h"
 #include "vec/common/hash_table/hash_map_context.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/hash_table/partitioned_hash_map.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
@@ -146,7 +147,7 @@ struct ProcessHashTableBuild {
             _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
         }};
 
-        KeyGetter key_getter(_build_raw_ptrs, _parent->build_key_sz());
+        KeyGetter key_getter(_build_raw_ptrs);
 
         SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->reset_resize_timer();
@@ -168,7 +169,7 @@ struct ProcessHashTableBuild {
             inserted_rows.reserve(_batch_size);
         }
 
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, 
_parent->build_key_sz(), _rows,
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
                                             null_map ? null_map->data() : 
nullptr);
 
         auto& arena = *_parent->arena();
@@ -244,7 +245,6 @@ private:
     ProfileCounter* _build_side_compute_hash_timer;
 };
 
-// TODO: use FixedHashTable instead of HashTable
 template <typename RowRefListType>
 using I8HashTableContext = PrimaryTypeHashTableContext<UInt8, RowRefListType>;
 template <typename RowRefListType>
@@ -267,6 +267,9 @@ using I128FixedKeyHashTableContext = 
FixedKeyHashTableContext<UInt128, has_null,
 template <bool has_null, typename RowRefListType>
 using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256, 
has_null, RowRefListType>;
 
+template <bool has_null, typename RowRefListType>
+using I136FixedKeyHashTableContext = FixedKeyHashTableContext<UInt136, 
has_null, RowRefListType>;
+
 using HashTableVariants = std::variant<
         std::monostate, SerializedHashTableContext<RowRefList>, 
I8HashTableContext<RowRefList>,
         I16HashTableContext<RowRefList>, I32HashTableContext<RowRefList>,
@@ -296,7 +299,13 @@ using HashTableVariants = std::variant<
         I128FixedKeyHashTableContext<true, RowRefListWithFlags>,
         I128FixedKeyHashTableContext<false, RowRefListWithFlags>,
         I256FixedKeyHashTableContext<true, RowRefListWithFlags>,
-        I256FixedKeyHashTableContext<false, RowRefListWithFlags>>;
+        I256FixedKeyHashTableContext<false, RowRefListWithFlags>,
+        I136FixedKeyHashTableContext<true, RowRefListWithFlags>,
+        I136FixedKeyHashTableContext<false, RowRefListWithFlags>,
+        I136FixedKeyHashTableContext<true, RowRefListWithFlag>,
+        I136FixedKeyHashTableContext<false, RowRefListWithFlag>,
+        I136FixedKeyHashTableContext<true, RowRefList>,
+        I136FixedKeyHashTableContext<false, RowRefList>>;
 
 class VExprContext;
 
@@ -361,13 +370,11 @@ public:
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }
     std::shared_ptr<std::vector<Block>> build_blocks() const { return 
_build_blocks; }
-    Sizes probe_key_sz() const { return _probe_key_sz; }
     std::vector<bool>* left_output_slot_flags() { return 
&_left_output_slot_flags; }
     std::vector<bool>* right_output_slot_flags() { return 
&_right_output_slot_flags; }
     bool* has_null_in_build_side() { return &_has_null_in_build_side; }
     DataTypes right_table_data_types() { return _right_table_data_types; }
     DataTypes left_table_data_types() { return _left_table_data_types; }
-    vectorized::Sizes& build_key_sz() { return _build_key_sz; }
     bool build_unique() const { return _build_unique; }
     std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return 
_runtime_filter_descs; }
     std::shared_ptr<vectorized::Arena> arena() { return _arena; }
@@ -474,9 +481,6 @@ private:
 
     bool _build_side_ignore_null = false;
 
-    Sizes _probe_key_sz;
-    Sizes _build_key_sz;
-
     bool _is_broadcast_join = false;
     bool _should_build_hash_table = true;
     std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = 
nullptr;
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index a42dea5a099..758fa4b7a5b 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -40,7 +40,9 @@
 #include "util/telemetry/telemetry.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/common/hash_table/hash.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/hash_table/hash_table_utils.h"
+#include "vec/common/hash_table/partitioned_hash_map.h"
 #include "vec/common/hash_table/string_hash_table.h"
 #include "vec/common/string_buffer.hpp"
 #include "vec/core/block.h"
@@ -238,47 +240,8 @@ void AggregationNode::_init_hash_method(const 
VExprContextSPtrs& probe_exprs) {
 
         _agg_data->init(get_hash_key_type_with_phase(t, !_is_first_phase), 
is_nullable);
     } else {
-        bool use_fixed_key = true;
-        bool has_null = false;
-        size_t key_byte_size = 0;
-        size_t bitmap_size = get_bitmap_size(_probe_expr_ctxs.size());
-
-        _probe_key_sz.resize(_probe_expr_ctxs.size());
-        for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
-            const auto& expr = _probe_expr_ctxs[i]->root();
-            const auto& data_type = expr->data_type();
-
-            if (!data_type->have_maximum_size_of_value()) {
-                use_fixed_key = false;
-                break;
-            }
-
-            auto is_null = data_type->is_nullable();
-            has_null |= is_null;
-            _probe_key_sz[i] = 
data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
-            key_byte_size += _probe_key_sz[i];
-        }
-
-        if (!has_null) {
-            bitmap_size = 0;
-        }
-
-        if (bitmap_size + key_byte_size > sizeof(UInt256)) {
-            use_fixed_key = false;
-        }
-
-        if (use_fixed_key) {
-            if (bitmap_size + key_byte_size <= sizeof(UInt64)) {
-                t = Type::int64_keys;
-            } else if (bitmap_size + key_byte_size <= sizeof(UInt128)) {
-                t = Type::int128_keys;
-            } else if (bitmap_size + key_byte_size <= sizeof(UInt136)) {
-                t = Type::int136_keys;
-            } else {
-                t = Type::int256_keys;
-            }
-            _agg_data->init(get_hash_key_type_with_phase(t, !_is_first_phase), 
has_null);
-        } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32, 
AggregateDataPtr>(
+                    _agg_data->method_variant, probe_exprs)) {
             _agg_data->init(Type::serialized);
         }
     }
@@ -883,8 +846,8 @@ void 
AggregationNode::_emplace_into_hash_table(AggregateDataPtr* places, ColumnR
                 SCOPED_TIMER(_hash_table_compute_timer);
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, _probe_key_sz, 
num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 auto creator = [this](const auto& ctor, auto& key, auto& 
origin) {
                     HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
@@ -920,8 +883,8 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* 
places, ColumnRawPtr
             [&](auto&& agg_method) -> void {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
-                AggState state(key_columns, _probe_key_sz);
-                agg_method.init_serialized_keys(key_columns, _probe_key_sz, 
num_rows);
+                AggState state(key_columns);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 /// For all rows.
                 for (size_t i = 0; i < num_rows; ++i) {
@@ -1091,7 +1054,7 @@ Status 
AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context
         }
     }
 
-    { context.insert_keys_into_columns(keys, key_columns, num_rows, 
_probe_key_sz); }
+    { context.insert_keys_into_columns(keys, key_columns, num_rows); }
 
     if (hash_table.has_null_key_data()) {
         // only one key of group by support wrap null key
@@ -1343,7 +1306,7 @@ Status 
AggregationNode::_get_result_with_serialized_key_non_spill(RuntimeState*
 
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
-                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows, _probe_key_sz);
+                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
                 }
 
                 for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -1470,7 +1433,7 @@ Status 
AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS
 
                 {
                     SCOPED_TIMER(_insert_keys_to_column_timer);
-                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows, _probe_key_sz);
+                    agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
                 }
 
                 if (iter == _aggregate_data_container->end()) {
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index e82730f3d54..f8b09bf8d4d 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -48,9 +48,9 @@
 #include "vec/common/arena.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/fixed_hash_map.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_map_context.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/hash_table/hash_map_util.h"
 #include "vec/common/hash_table/partitioned_hash_map.h"
 #include "vec/common/hash_table/ph_hash_map.h"
@@ -87,9 +87,8 @@ namespace vectorized {
 using AggregatedDataWithoutKey = AggregateDataPtr;
 using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr>;
 using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>;
-using AggregatedDataWithUInt8Key =
-        FixedImplicitZeroHashMapWithCalculatedSize<UInt8, AggregateDataPtr>;
-using AggregatedDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, 
AggregateDataPtr>;
+using AggregatedDataWithUInt8Key = PHHashMap<UInt8, AggregateDataPtr>;
+using AggregatedDataWithUInt16Key = PHHashMap<UInt16, AggregateDataPtr>;
 using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr, 
HashCRC32<UInt32>>;
 using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr, 
HashCRC32<UInt64>>;
 using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr, 
HashCRC32<UInt128>>;
@@ -199,30 +198,6 @@ struct AggregatedDataVariants
         case Type::int128_key_phase2:
             emplace_single<UInt128, AggregatedDataWithUInt128KeyPhase2, 
nullable>();
             break;
-        case Type::int64_keys:
-            emplace_fixed<AggregatedDataWithUInt64Key, nullable>();
-            break;
-        case Type::int64_keys_phase2:
-            emplace_fixed<AggregatedDataWithUInt64KeyPhase2, nullable>();
-            break;
-        case Type::int128_keys:
-            emplace_fixed<AggregatedDataWithUInt128Key, nullable>();
-            break;
-        case Type::int128_keys_phase2:
-            emplace_fixed<AggregatedDataWithUInt128KeyPhase2, nullable>();
-            break;
-        case Type::int136_keys:
-            emplace_fixed<AggregatedDataWithUInt136Key, nullable>();
-            break;
-        case Type::int136_keys_phase2:
-            emplace_fixed<AggregatedDataWithUInt136KeyPhase2, nullable>();
-            break;
-        case Type::int256_keys:
-            emplace_fixed<AggregatedDataWithUInt256Key, nullable>();
-            break;
-        case Type::int256_keys_phase2:
-            emplace_fixed<AggregatedDataWithUInt256KeyPhase2, nullable>();
-            break;
         case Type::string_key:
             if (nullable) {
                 method_variant.emplace<MethodSingleNullableColumn<
@@ -442,7 +417,6 @@ protected:
     VExprContextSPtrs _probe_expr_ctxs;
     AggregatedDataVariantsUPtr _agg_data;
 
-    std::vector<size_t> _probe_key_sz;
     // left / full join will change the key nullable make output/input solt
     // nullable diff. so we need make nullable of it.
     std::vector<size_t> _make_nullable_keys;
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
index ee882acf1b0..464c742b24b 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -30,7 +30,9 @@
 #include "common/object_pool.h"
 #include "runtime/runtime_state.h"
 #include "vec/common/hash_table/hash.h"
+#include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/hash_table/hash_set.h"
+#include "vec/common/hash_table/partitioned_hash_map.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
@@ -107,9 +109,9 @@ void VPartitionSortNode::_emplace_into_hash_table(const 
ColumnRawPtrs& key_colum
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
                 using AggState = typename HashMethodType::State;
 
-                AggState state(key_columns, _partition_key_sz);
+                AggState state(key_columns);
                 size_t num_rows = input_block->rows();
-                agg_method.init_serialized_keys(key_columns, 
_partition_key_sz, num_rows);
+                agg_method.init_serialized_keys(key_columns, num_rows);
 
                 auto creator = [&](const auto& ctor, auto& key, auto& origin) {
                     HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
@@ -378,54 +380,8 @@ void VPartitionSortNode::_init_hash_method() {
             
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
         }
     } else {
-        bool use_fixed_key = true;
-        bool has_null = false;
-        size_t key_byte_size = 0;
-        size_t bitmap_size = get_bitmap_size(_partition_exprs_num);
-
-        _partition_key_sz.resize(_partition_exprs_num);
-        for (int i = 0; i < _partition_exprs_num; ++i) {
-            const auto& data_type = 
_partition_expr_ctxs[i]->root()->data_type();
-
-            if (!data_type->have_maximum_size_of_value()) {
-                use_fixed_key = false;
-                break;
-            }
-
-            auto is_null = data_type->is_nullable();
-            has_null |= is_null;
-            _partition_key_sz[i] =
-                    data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
-            key_byte_size += _partition_key_sz[i];
-        }
-
-        if (bitmap_size + key_byte_size > sizeof(UInt256)) {
-            use_fixed_key = false;
-        }
-
-        if (use_fixed_key) {
-            if (has_null) {
-                if (bitmap_size + key_byte_size <= sizeof(UInt64)) {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
-                } else if (bitmap_size + key_byte_size <= sizeof(UInt128)) {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
-                                            has_null);
-                } else {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
-                                            has_null);
-                }
-            } else {
-                if (key_byte_size <= sizeof(UInt64)) {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
-                } else if (key_byte_size <= sizeof(UInt128)) {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
-                                            has_null);
-                } else {
-                    
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
-                                            has_null);
-                }
-            }
-        } else {
+        if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32, 
PartitionDataPtr>(
+                    _partitioned_data->method_variant, _partition_expr_ctxs)) {
             
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
         }
     }
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index 835fe6e7fa4..73b21ee676f 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -26,7 +26,6 @@
 #include "exec/exec_node.h"
 #include "vec/columns/column.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/fixed_hash_map.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/ph_hash_map.h"
 #include "vec/common/hash_table/string_hash_map.h"
@@ -85,13 +84,13 @@ public:
 using PartitionDataPtr = PartitionBlocks*;
 using PartitionDataWithStringKey = PHHashMap<StringRef, PartitionDataPtr>;
 using PartitionDataWithShortStringKey = StringHashMap<PartitionDataPtr>;
-using PartitionDataWithUInt8Key =
-        FixedImplicitZeroHashMapWithCalculatedSize<UInt8, PartitionDataPtr>;
-using PartitionDataWithUInt16Key = FixedImplicitZeroHashMap<UInt16, 
PartitionDataPtr>;
+using PartitionDataWithUInt8Key = PHHashMap<UInt8, PartitionDataPtr>;
+using PartitionDataWithUInt16Key = PHHashMap<UInt16, PartitionDataPtr>;
 using PartitionDataWithUInt32Key = PHHashMap<UInt32, PartitionDataPtr, 
HashCRC32<UInt32>>;
 using PartitionDataWithUInt64Key = PHHashMap<UInt64, PartitionDataPtr, 
HashCRC32<UInt64>>;
 using PartitionDataWithUInt128Key = PHHashMap<UInt128, PartitionDataPtr, 
HashCRC32<UInt128>>;
 using PartitionDataWithUInt256Key = PHHashMap<UInt256, PartitionDataPtr, 
HashCRC32<UInt256>>;
+using PartitionDataWithUInt136Key = PHHashMap<UInt136, PartitionDataPtr, 
HashCRC32<UInt136>>;
 
 using PartitionedMethodVariants = std::variant<
         MethodSerialized<PartitionDataWithStringKey>,
@@ -116,6 +115,8 @@ using PartitionedMethodVariants = std::variant<
         MethodKeysFixed<PartitionDataWithUInt128Key, true>,
         MethodKeysFixed<PartitionDataWithUInt256Key, false>,
         MethodKeysFixed<PartitionDataWithUInt256Key, true>,
+        MethodKeysFixed<PartitionDataWithUInt136Key, false>,
+        MethodKeysFixed<PartitionDataWithUInt136Key, true>,
         MethodStringNoCache<PartitionDataWithShortStringKey>,
         MethodSingleNullableColumn<
                 
MethodStringNoCache<DataWithNullKey<PartitionDataWithShortStringKey>>>>;
@@ -151,18 +152,6 @@ struct PartitionedHashMapVariants
             emplace_single<UInt128, PartitionDataWithUInt128Key, nullable>();
             break;
         }
-        case Type::int64_keys: {
-            emplace_fixed<PartitionDataWithUInt64Key, nullable>();
-            break;
-        }
-        case Type::int128_keys: {
-            emplace_fixed<PartitionDataWithUInt128Key, nullable>();
-            break;
-        }
-        case Type::int256_keys: {
-            emplace_fixed<PartitionDataWithUInt256Key, nullable>();
-            break;
-        }
         case Type::string_key: {
             if (nullable) {
                 method_variant.emplace<MethodSingleNullableColumn<
@@ -220,7 +209,6 @@ private:
     int _partition_exprs_num = 0;
     VExprContextSPtrs _partition_expr_ctxs;
     std::vector<const IColumn*> _partition_columns;
-    std::vector<size_t> _partition_key_sz;
 
     std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts;
     std::vector<PartitionDataPtr> _value_places;
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index db1eda521e5..9b15db67b3c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -215,58 +215,8 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
         }
         return;
     }
-
-    bool use_fixed_key = true;
-    bool has_null = false;
-    size_t key_byte_size = 0;
-    size_t bitmap_size = get_bitmap_size(_child_expr_lists[0].size());
-
-    _build_key_sz.resize(_child_expr_lists[0].size());
-    _probe_key_sz.resize(_child_expr_lists[0].size());
-    for (int i = 0; i < _child_expr_lists[0].size(); ++i) {
-        const auto vexpr = _child_expr_lists[0][i]->root();
-        const auto& data_type = vexpr->data_type();
-
-        if (!data_type->have_maximum_size_of_value()) {
-            use_fixed_key = false;
-            break;
-        }
-
-        auto is_null = data_type->is_nullable();
-        has_null |= is_null;
-        _build_key_sz[i] = data_type->get_maximum_size_of_value_in_memory() - 
(is_null ? 1 : 0);
-        _probe_key_sz[i] = _build_key_sz[i];
-        key_byte_size += _probe_key_sz[i];
-    }
-
-    if (bitmap_size + key_byte_size > sizeof(UInt256)) {
-        use_fixed_key = false;
-    }
-    if (use_fixed_key) {
-        if (has_null) {
-            if (bitmap_size + key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants
-                        ->emplace<I64FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
-            } else if (bitmap_size + key_byte_size <= sizeof(UInt128)) {
-                _hash_table_variants
-                        ->emplace<I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
-            } else {
-                _hash_table_variants
-                        ->emplace<I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>>();
-            }
-        } else {
-            if (key_byte_size <= sizeof(UInt64)) {
-                _hash_table_variants
-                        ->emplace<I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
-            } else if (key_byte_size <= sizeof(UInt128)) {
-                _hash_table_variants
-                        ->emplace<I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
-            } else {
-                _hash_table_variants
-                        ->emplace<I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>>();
-            }
-        }
-    } else {
+    if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, 
RowRefListWithFlags>(
+                *_hash_table_variants, _child_expr_lists[0])) {
         
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
     }
 }
@@ -573,11 +523,12 @@ void 
VSetOperationNode<is_intersect>::refresh_hash_table() {
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     if constexpr (std::is_same_v<typename 
HashTableCtxType::Mapped,
                                                  RowRefListWithFlags>) {
-                        HashTableCtxType tmp_hash_table;
+                        auto tmp_hash_table =
+                                std::make_shared<typename 
HashTableCtxType::HashMapType>();
                         bool is_need_shrink =
                                 
arg.hash_table->should_be_shrink(_valid_element_in_hash_tbl);
                         if (is_intersect || is_need_shrink) {
-                            tmp_hash_table.hash_table->init_buf_size(
+                            tmp_hash_table->init_buf_size(
                                     _valid_element_in_hash_tbl / 
arg.hash_table->get_factor() + 1);
                         }
 
@@ -593,15 +544,13 @@ void 
VSetOperationNode<is_intersect>::refresh_hash_table() {
                                         if constexpr (is_intersect) { 
//intersected
                                             if (it->visited) {
                                                 it->visited = false;
-                                                
tmp_hash_table.hash_table->insert(
-                                                        iter->get_value());
+                                                
tmp_hash_table->insert(iter->get_value());
                                             }
                                             ++iter;
                                         } else { //except
                                             if constexpr 
(is_need_shrink_const) {
                                                 if (!it->visited) {
-                                                    
tmp_hash_table.hash_table->insert(
-                                                            iter->get_value());
+                                                    
tmp_hash_table->insert(iter->get_value());
                                                 }
                                             }
                                             ++iter;
@@ -612,7 +561,7 @@ void VSetOperationNode<is_intersect>::refresh_hash_table() {
 
                         arg.reset();
                         if (is_intersect || is_need_shrink) {
-                            arg.hash_table = 
std::move(tmp_hash_table.hash_table);
+                            arg.hash_table = std::move(tmp_hash_table);
                         }
                     } else {
                         LOG(FATAL) << "FATAL: Invalid RowRefList";
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index ffc4f30202b..dfd96430115 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -95,8 +95,6 @@ private:
 
     std::unique_ptr<HashTableVariants> _hash_table_variants;
 
-    std::vector<size_t> _probe_key_sz;
-    std::vector<size_t> _build_key_sz;
     std::vector<bool> _build_not_ignore_null;
 
     //record element size in hashtable
diff --git a/be/src/vec/functions/array/function_array_enumerate_uniq.cpp 
b/be/src/vec/functions/array/function_array_enumerate_uniq.cpp
index 81c490c16ff..ead65b0e1e2 100644
--- a/be/src/vec/functions/array/function_array_enumerate_uniq.cpp
+++ b/be/src/vec/functions/array/function_array_enumerate_uniq.cpp
@@ -234,11 +234,11 @@ private:
                           [[maybe_unused]] const NullMap* null_map,
                           ColumnInt64::Container& dst_values) const {
         HashTableContext ctx;
-        ctx.init_serialized_keys(columns, {}, columns[0]->size(),
+        ctx.init_serialized_keys(columns, columns[0]->size(),
                                  null_map ? null_map->data() : nullptr);
 
         using KeyGetter = typename HashTableContext::State;
-        KeyGetter key_getter(columns, {});
+        KeyGetter key_getter(columns);
 
         auto creator = [&](const auto& ctor, auto& key, auto& origin) { 
ctor(key, 0); };
         auto creator_for_null_key = [&](auto& mapped) { mapped = 0; };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to