This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit fdf0cdafda46a4bc46915fd0d0b16d27f05b4ca7
Author: ZenoYang <[email protected]>
AuthorDate: Sun Mar 20 23:04:25 2022 +0800

    [improvement](storage) Low cardinality string optimization in storage layer 
(#8318)
    
    Low cardinality string optimization in storage layer
---
 be/src/common/config.h                             |   2 +
 be/src/exprs/expr_value.h                          |   2 +-
 be/src/olap/column_predicate.h                     |   2 +
 be/src/olap/comparison_predicate.cpp               |  79 +++--
 be/src/olap/comparison_predicate.h                 |  55 +--
 be/src/olap/in_list_predicate.cpp                  |  63 +++-
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp |  22 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  14 +
 be/src/olap/schema.cpp                             |   4 +
 be/src/runtime/string_value.h                      |  81 ++++-
 be/src/runtime/string_value.hpp                    |  72 ----
 be/src/vec/columns/column.h                        |   8 +-
 be/src/vec/columns/column_dictionary.h             | 386 +++++++++++++++++++++
 be/src/vec/columns/column_nullable.h               |   7 +-
 be/src/vec/columns/column_string.h                 |   3 +-
 be/src/vec/columns/predicate_column.h              |   3 +-
 16 files changed, 657 insertions(+), 146 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index f891fc0..5cb5a17 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -234,6 +234,8 @@ CONF_Bool(disable_storage_page_cache, "false");
 
 CONF_Bool(enable_storage_vectorization, "false");
 
+CONF_Bool(enable_low_cardinality_optimize, "false");
+
 // be policy
 // whether disable automatic compaction task
 CONF_mBool(disable_auto_compaction, "false");
diff --git a/be/src/exprs/expr_value.h b/be/src/exprs/expr_value.h
index 62c6be8..886bccd 100644
--- a/be/src/exprs/expr_value.h
+++ b/be/src/exprs/expr_value.h
@@ -57,7 +57,7 @@ struct ExprValue {
               float_val(0.0),
               double_val(0.0),
               string_data(),
-              string_val(nullptr, 0),
+              string_val(),
               datetime_val(),
               decimalv2_val(0),
               array_val() {}
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index b356a9b..342d8fc 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -73,6 +73,8 @@ public:
 
     virtual bool is_bloom_filter_predicate() { return false; }
 
+    virtual bool is_range_comparison_predicate() { return false; }
+
 protected:
     uint32_t _column_id;
     bool _opposite;
diff --git a/be/src/olap/comparison_predicate.cpp 
b/be/src/olap/comparison_predicate.cpp
index 2d55456..d74dd10 100644
--- a/be/src/olap/comparison_predicate.cpp
+++ b/be/src/olap/comparison_predicate.cpp
@@ -21,6 +21,7 @@
 #include "olap/schema.h"
 #include "runtime/string_value.hpp"
 #include "runtime/vectorized_row_batch.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/predicate_column.h"
@@ -145,28 +146,65 @@ COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(LessEqualPredicate, 
<=)
 COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterPredicate, >)
 COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
 
-#define COMPARISON_PRED_COLUMN_EVALUATE(CLASS, OP)                             
                    \
+// todo(zeno) define interface in IColumn to simplify code
+#define COMPARISON_PRED_COLUMN_EVALUATE(CLASS, OP, IS_RANGE)                   
                    \
     template <class type>                                                      
                    \
     void CLASS<type>::evaluate(vectorized::IColumn& column, uint16_t* sel, 
uint16_t* size) const { \
         uint16_t new_size = 0;                                                 
                    \
         if (column.is_nullable()) {                                            
                    \
-            auto* nullable_column =                                            
                    \
+            auto* nullable_col =                                               
                    \
                     
vectorized::check_and_get_column<vectorized::ColumnNullable>(column);          \
-            auto& null_bitmap = reinterpret_cast<const 
vectorized::ColumnVector<uint8_t>&>(        \
-                                        
*(nullable_column->get_null_map_column_ptr()))             \
+            auto& null_bitmap = reinterpret_cast<const 
vectorized::ColumnUInt8&>(                  \
+                                        nullable_col->get_null_map_column())   
                    \
                                         .get_data();                           
                    \
-            auto* nest_column_vector =                                         
                    \
-                    
vectorized::check_and_get_column<vectorized::PredicateColumnType<type>>(       \
-                            nullable_column->get_nested_column());             
                    \
-            auto& data_array = nest_column_vector->get_data();                 
                    \
-            for (uint16_t i = 0; i < *size; i++) {                             
                    \
-                uint16_t idx = sel[i];                                         
                    \
-                sel[new_size] = idx;                                           
                    \
-                const type& cell_value = reinterpret_cast<const 
type&>(data_array[idx]);           \
-                bool ret = !null_bitmap[idx] && (cell_value OP _value);        
                    \
-                new_size += _opposite ? !ret : ret;                            
                    \
+            auto& nested_col = nullable_col->get_nested_column();              
                    \
+            if (nested_col.is_column_dictionary()) {                           
                    \
+                if constexpr (std::is_same_v<type, StringValue>) {             
                    \
+                    auto* nested_col_ptr = vectorized::check_and_get_column<   
                    \
+                            
vectorized::ColumnDictionary<vectorized::Int32>>(nested_col);          \
+                    auto code = IS_RANGE ? 
nested_col_ptr->find_bound_code(_value, 0 OP 1, 1 OP 1) \
+                                         : nested_col_ptr->find_code(_value);  
                    \
+                    auto& data_array = nested_col_ptr->get_data();             
                    \
+                    for (uint16_t i = 0; i < *size; i++) {                     
                    \
+                        uint16_t idx = sel[i];                                 
                    \
+                        sel[new_size] = idx;                                   
                    \
+                        const auto& cell_value =                               
                    \
+                                reinterpret_cast<const 
vectorized::Int32&>(data_array[idx]);       \
+                        bool ret = !null_bitmap[idx] && (cell_value OP code);  
                    \
+                        new_size += _opposite ? !ret : ret;                    
                    \
+                    }                                                          
                    \
+                }                                                              
                    \
+            } else {                                                           
                    \
+                auto* nested_col_ptr =                                         
                    \
+                        
vectorized::check_and_get_column<vectorized::PredicateColumnType<type>>(   \
+                                nested_col);                                   
                    \
+                auto& data_array = nested_col_ptr->get_data();                 
                    \
+                for (uint16_t i = 0; i < *size; i++) {                         
                    \
+                    uint16_t idx = sel[i];                                     
                    \
+                    sel[new_size] = idx;                                       
                    \
+                    const type& cell_value = reinterpret_cast<const 
type&>(data_array[idx]);       \
+                    bool ret = !null_bitmap[idx] && (cell_value OP _value);    
                    \
+                    new_size += _opposite ? !ret : ret;                        
                    \
+                }                                                              
                    \
             }                                                                  
                    \
             *size = new_size;                                                  
                    \
+        } else if (column.is_column_dictionary()) {                            
                    \
+            if constexpr (std::is_same_v<type, StringValue>) {                 
                    \
+                auto& dict_col =                                               
                    \
+                        
reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(column);\
+                auto& data_array = dict_col.get_data();                        
                    \
+                auto code = IS_RANGE ? dict_col.find_bound_code(_value, 0 OP 
1, 1 OP 1)            \
+                                     : dict_col.find_code(_value);             
                    \
+                for (uint16_t i = 0; i < *size; ++i) {                         
                    \
+                    uint16_t idx = sel[i];                                     
                    \
+                    sel[new_size] = idx;                                       
                    \
+                    const auto& cell_value =                                   
                    \
+                            reinterpret_cast<const 
vectorized::Int32&>(data_array[idx]);           \
+                    bool ret = cell_value OP code;                             
                    \
+                    new_size += _opposite ? !ret : ret;                        
                    \
+                }                                                              
                    \
+                *size = new_size;                                              
                    \
+            }                                                                  
                    \
         } else {                                                               
                    \
             auto& pred_column_ref =                                            
                    \
                     
reinterpret_cast<vectorized::PredicateColumnType<type>&>(column);              \
@@ -182,12 +220,13 @@ 
COMPARISON_PRED_COLUMN_BLOCK_EVALUATE(GreaterEqualPredicate, >=)
         }                                                                      
                    \
     }
 
-COMPARISON_PRED_COLUMN_EVALUATE(EqualPredicate, ==)
-COMPARISON_PRED_COLUMN_EVALUATE(NotEqualPredicate, !=)
-COMPARISON_PRED_COLUMN_EVALUATE(LessPredicate, <)
-COMPARISON_PRED_COLUMN_EVALUATE(LessEqualPredicate, <=)
-COMPARISON_PRED_COLUMN_EVALUATE(GreaterPredicate, >)
-COMPARISON_PRED_COLUMN_EVALUATE(GreaterEqualPredicate, >=)
+
+COMPARISON_PRED_COLUMN_EVALUATE(EqualPredicate, ==, false)
+COMPARISON_PRED_COLUMN_EVALUATE(NotEqualPredicate, !=, false)
+COMPARISON_PRED_COLUMN_EVALUATE(LessPredicate, <, true)
+COMPARISON_PRED_COLUMN_EVALUATE(LessEqualPredicate, <=, true)
+COMPARISON_PRED_COLUMN_EVALUATE(GreaterPredicate, >, true)
+COMPARISON_PRED_COLUMN_EVALUATE(GreaterEqualPredicate, >=, true)
 
 #define COMPARISON_PRED_COLUMN_EVALUATE_VEC(CLASS, OP)                         
                \
     template <class type>                                                      
                \
diff --git a/be/src/olap/comparison_predicate.h 
b/be/src/olap/comparison_predicate.h
index 30fd9fd..e363675 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -26,35 +26,38 @@ namespace doris {
 
 class VectorizedRowBatch;
 
-#define COMPARISON_PRED_CLASS_DEFINE(CLASS)                                    
               \
-    template <class type>                                                      
               \
-    class CLASS : public ColumnPredicate {                                     
               \
-    public:                                                                    
               \
-        CLASS(uint32_t column_id, const type& value, bool opposite = false);   
               \
-        virtual void evaluate(VectorizedRowBatch* batch) const override;       
               \
-        void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const 
override;      \
-        void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,     
               \
-                         bool* flags) const override;                          
               \
-        void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,    
               \
-                          bool* flags) const override;                         
               \
-        virtual Status evaluate(const Schema& schema,                          
               \
-                                const std::vector<BitmapIndexIterator*>& 
iterators,           \
-                                uint32_t num_rows, roaring::Roaring* roaring) 
const override; \
-        void evaluate(vectorized::IColumn& column, uint16_t* sel, uint16_t* 
size) const override; \
-        void evaluate_and(vectorized::IColumn& column, uint16_t* sel, uint16_t 
size, bool* flags) const override; \
-        void evaluate_or(vectorized::IColumn& column, uint16_t* sel, uint16_t 
size, bool* flags) const override; \
+#define COMPARISON_PRED_CLASS_DEFINE(CLASS, IS_RANGE)                          
                    \
+    template <class type>                                                      
                    \
+    class CLASS : public ColumnPredicate {                                     
                    \
+    public:                                                                    
                    \
+        CLASS(uint32_t column_id, const type& value, bool opposite = false);   
                    \
+        virtual void evaluate(VectorizedRowBatch* batch) const override;       
                    \
+        void evaluate(ColumnBlock* block, uint16_t* sel, uint16_t* size) const 
override;           \
+        void evaluate_or(ColumnBlock* block, uint16_t* sel, uint16_t size,     
                    \
+                         bool* flags) const override;                          
                    \
+        void evaluate_and(ColumnBlock* block, uint16_t* sel, uint16_t size,    
                    \
+                          bool* flags) const override;                         
                    \
+        virtual Status evaluate(const Schema& schema,                          
                    \
+                                const std::vector<BitmapIndexIterator*>& 
iterators,                \
+                                uint32_t num_rows, roaring::Roaring* roaring) 
const override;      \
+        void evaluate(vectorized::IColumn& column, uint16_t* sel, uint16_t* 
size) const override;  \
+        void evaluate_and(vectorized::IColumn& column, uint16_t* sel, uint16_t 
size,               \
+                          bool* flags) const override;                         
                    \
+        void evaluate_or(vectorized::IColumn& column, uint16_t* sel, uint16_t 
size,                \
+                         bool* flags) const override;                          
                    \
         void evaluate_vec(vectorized::IColumn& column, uint16_t size, bool* 
flags) const override; \
-                                                                               
               \
-    private:                                                                   
               \
-        type _value;                                                           
               \
+        bool is_range_comparison_predicate() override { return IS_RANGE; }     
                    \
+                                                                               
                    \
+    private:                                                                   
                    \
+        type _value;                                                           
                    \
     };
 
-COMPARISON_PRED_CLASS_DEFINE(EqualPredicate)
-COMPARISON_PRED_CLASS_DEFINE(NotEqualPredicate)
-COMPARISON_PRED_CLASS_DEFINE(LessPredicate)
-COMPARISON_PRED_CLASS_DEFINE(LessEqualPredicate)
-COMPARISON_PRED_CLASS_DEFINE(GreaterPredicate)
-COMPARISON_PRED_CLASS_DEFINE(GreaterEqualPredicate)
+COMPARISON_PRED_CLASS_DEFINE(EqualPredicate, false)
+COMPARISON_PRED_CLASS_DEFINE(NotEqualPredicate, false)
+COMPARISON_PRED_CLASS_DEFINE(LessPredicate, true)
+COMPARISON_PRED_CLASS_DEFINE(LessEqualPredicate, true)
+COMPARISON_PRED_CLASS_DEFINE(GreaterPredicate, true)
+COMPARISON_PRED_CLASS_DEFINE(GreaterEqualPredicate, true)
 
 } //namespace doris
 
diff --git a/be/src/olap/in_list_predicate.cpp 
b/be/src/olap/in_list_predicate.cpp
index a17e157..3fdac7d 100644
--- a/be/src/olap/in_list_predicate.cpp
+++ b/be/src/olap/in_list_predicate.cpp
@@ -20,6 +20,7 @@
 #include "olap/field.h"
 #include "runtime/string_value.hpp"
 #include "runtime/vectorized_row_batch.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/columns/column_nullable.h"
 
@@ -117,26 +118,62 @@ IN_LIST_PRED_EVALUATE(NotInListPredicate, ==)
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(InListPredicate, !=)
 IN_LIST_PRED_COLUMN_BLOCK_EVALUATE(NotInListPredicate, ==)
 
+// todo(zeno) define interface in IColumn to simplify code
 #define IN_LIST_PRED_COLUMN_EVALUATE(CLASS, OP)                                
                    \
     template <class type>                                                      
                    \
     void CLASS<type>::evaluate(vectorized::IColumn& column, uint16_t* sel, 
uint16_t* size) const { \
         uint16_t new_size = 0;                                                 
                    \
         if (column.is_nullable()) {                                            
                    \
-            auto* nullable_column =                                            
                    \
-                
vectorized::check_and_get_column<vectorized::ColumnNullable>(column);           
   \
-            auto& null_bitmap = reinterpret_cast<const 
vectorized::ColumnVector<uint8_t>&>(*(      \
-                nullable_column->get_null_map_column_ptr())).get_data();       
                    \
-            auto* nest_column_vector = vectorized::check_and_get_column        
                    \
-                
<vectorized::PredicateColumnType<type>>(nullable_column->get_nested_column());  
   \
-            auto& data_array = nest_column_vector->get_data();                 
                    \
-            for (uint16_t i = 0; i < *size; i++) {                             
                    \
-                uint16_t idx = sel[i];                                         
                    \
-                sel[new_size] = idx;                                           
                    \
-                const type& cell_value = reinterpret_cast<const 
type&>(data_array[idx]);           \
-                bool ret = !null_bitmap[idx] && (_values.find(cell_value) OP 
_values.end());       \
-                new_size += _opposite ? !ret : ret;                            
                    \
+            auto* nullable_col =                                               
                    \
+                    
vectorized::check_and_get_column<vectorized::ColumnNullable>(column);          \
+            auto& null_bitmap = reinterpret_cast<const 
vectorized::ColumnUInt8&>(                  \
+                                        
nullable_col->get_null_map_column()).get_data();           \
+            auto& nested_col = nullable_col->get_nested_column();              
                    \
+            if (nested_col.is_column_dictionary()) {                           
                    \
+                if constexpr (std::is_same_v<type, StringValue>) {             
                    \
+                    auto* nested_col_ptr = vectorized::check_and_get_column<   
                    \
+                            
vectorized::ColumnDictionary<vectorized::Int32>>(nested_col);          \
+                    auto code_set = nested_col_ptr->find_codes(_values);       
                    \
+                    auto& data_array = nested_col_ptr->get_data();             
                    \
+                    for (uint16_t i = 0; i < *size; i++) {                     
                    \
+                        uint16_t idx = sel[i];                                 
                    \
+                        sel[new_size] = idx;                                   
                    \
+                        const auto& cell_value =                               
                    \
+                                reinterpret_cast<const 
vectorized::Int32&>(data_array[idx]);       \
+                        bool ret = !null_bitmap[idx]                           
                    \
+                                   && (code_set.find(cell_value) OP 
code_set.end());               \
+                        new_size += _opposite ? !ret : ret;                    
                    \
+                    }                                                          
                    \
+                }                                                              
                    \
+            } else {                                                           
                    \
+                auto* nested_col_ptr = vectorized::check_and_get_column<       
                    \
+                        vectorized::PredicateColumnType<type>>(nested_col);    
                    \
+                auto& data_array = nested_col_ptr->get_data();                 
                    \
+                for (uint16_t i = 0; i < *size; i++) {                         
                    \
+                    uint16_t idx = sel[i];                                     
                    \
+                    sel[new_size] = idx;                                       
                    \
+                    const type& cell_value = reinterpret_cast<const 
type&>(data_array[idx]);       \
+                    bool ret = !null_bitmap[idx] && (_values.find(cell_value) 
OP _values.end());   \
+                    new_size += _opposite ? !ret : ret;                        
                    \
+                }                                                              
                    \
             }                                                                  
                    \
             *size = new_size;                                                  
                    \
+        } else if (column.is_column_dictionary()) {                            
                    \
+            if constexpr (std::is_same_v<type, StringValue>) {                 
                    \
+                auto& dict_col =                                               
                    \
+                        
reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(        \
+                                column);                                       
                    \
+                auto& data_array = dict_col.get_data();                        
                    \
+                auto code_set = dict_col.find_codes(_values);                  
                    \
+                for (uint16_t i = 0; i < *size; i++) {                         
                    \
+                    uint16_t idx = sel[i];                                     
                    \
+                    sel[new_size] = idx;                                       
                    \
+                    const auto& cell_value =                                   
                    \
+                            reinterpret_cast<const 
vectorized::Int32&>(data_array[idx]);           \
+                    auto result = (code_set.find(cell_value) OP 
code_set.end());                   \
+                    new_size += _opposite ? !result : result;                  
                    \
+                }                                                              
                    \
+            }                                                                  
                    \
         } else {                                                               
                    \
             auto& number_column = 
reinterpret_cast<vectorized::PredicateColumnType<type>&>(column);\
             auto& data_array = number_column.get_data();                       
                    \
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp 
b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index 7fc11da..f199000 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -21,6 +21,8 @@
 #include "gutil/strings/substitute.h" // for Substitute
 #include "runtime/mem_pool.h"
 #include "util/slice.h" // for Slice
+#include "vec/columns/column.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_nullable.h"
@@ -132,7 +134,6 @@ void BinaryDictPageBuilder::reset() {
     } else {
         _data_page_builder->reset();
     }
-    _finished = false;
 }
 
 size_t BinaryDictPageBuilder::count() const {
@@ -240,6 +241,18 @@ void BinaryDictPageDecoder::set_dict_decoder(PageDecoder* 
dict_decoder, StringRe
 
 Status BinaryDictPageDecoder::next_batch(size_t* n, 
vectorized::MutableColumnPtr &dst) {
     if (_encoding_type == PLAIN_ENCODING) {
+        // todo(zeno) Handle convert in ColumnDictionary,
+        //  add interface like convert_to_predicate_column_if_necessary
+        auto* col_ptr = dst.get();
+        if (dst->is_nullable()) {
+            auto nullable_col = 
reinterpret_cast<vectorized::ColumnNullable*>(dst.get());
+            col_ptr = nullable_col->get_nested_column_ptr().get();
+        }
+
+        if (col_ptr->is_column_dictionary()) {
+            auto* dict_col_ptr = 
reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>*>(col_ptr);
+            col_ptr = 
(*std::move(dict_col_ptr->convert_to_predicate_column())).assume_mutable();
+        }
         return _data_page_decoder->next_batch(n, dst);
     }
     // dictionary encoding
@@ -254,15 +267,14 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, 
vectorized::MutableColumnPtr
     size_t max_fetch = std::min(*n, 
static_cast<size_t>(_bit_shuffle_ptr->_num_elements - 
_bit_shuffle_ptr->_cur_index));
     *n = max_fetch;
  
-    const int32_t* data_array = reinterpret_cast<const 
int32_t*>(_bit_shuffle_ptr->_chunk.data);
+    const auto* data_array = reinterpret_cast<const 
int32_t*>(_bit_shuffle_ptr->_chunk.data);
     size_t start_index = _bit_shuffle_ptr->_cur_index;
 
-    dst->insert_many_dict_data(data_array, start_index, _dict_word_info, 
max_fetch);
+    dst->insert_many_dict_data(data_array, start_index, _dict_word_info, 
max_fetch, _dict_decoder->_num_elems);
 
     _bit_shuffle_ptr->_cur_index += max_fetch;
  
     return Status::OK();
- 
 }
 
 Status BinaryDictPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
@@ -276,7 +288,7 @@ Status BinaryDictPageDecoder::next_batch(size_t* n, 
ColumnBlockView* dst) {
     if (PREDICT_FALSE(*n == 0)) {
         return Status::OK();
     }
-    Slice* out = reinterpret_cast<Slice*>(dst->data());
+    auto* out = reinterpret_cast<Slice*>(dst->data());
 
     _batch->resize(*n);
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 16b4e74..1ea0193 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -33,6 +33,7 @@
 #include "olap/short_key_index.h"
 #include "util/doris_metrics.h"
 #include "util/simd/bits.h"
+#include "vec/columns/column_dictionary.h"
 
 using strings::Substitute;
 
@@ -858,6 +859,19 @@ void 
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_
     for (auto column_predicate : _short_cir_eval_predicate) {
         auto column_id = column_predicate->column_id();
         auto& short_cir_column = _current_return_columns[column_id];
+        auto* col_ptr = short_cir_column.get();
+        // todo(zeno) define convert_dict_codes_if_dictionary interface in 
IColumn
+        if (short_cir_column->is_nullable()) {
+            auto nullable_col =
+                    
reinterpret_cast<vectorized::ColumnNullable*>(short_cir_column.get());
+            col_ptr = nullable_col->get_nested_column_ptr().get();
+        }
+
+        if (col_ptr->is_column_dictionary() && 
column_predicate->is_range_comparison_predicate()) {
+            auto& dict_col =
+                    
reinterpret_cast<vectorized::ColumnDictionary<vectorized::Int32>&>(*col_ptr);
+            dict_col.convert_dict_codes();
+        }
         column_predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, 
selected_size_ptr);
     }
 
diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp
index 593ba5b..77b31af 100644
--- a/be/src/olap/schema.cpp
+++ b/be/src/olap/schema.cpp
@@ -20,6 +20,7 @@
 #include "olap/row_block2.h"
 #include "olap/uint24.h"
 #include "vec/columns/column_complex.h"
+#include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/predicate_column.h"
 #include "vec/core/types.h"
@@ -203,6 +204,9 @@ vectorized::IColumn::MutablePtr 
Schema::get_predicate_column_ptr(FieldType type)
     case OLAP_FIELD_TYPE_CHAR:
     case OLAP_FIELD_TYPE_VARCHAR:
     case OLAP_FIELD_TYPE_STRING:
+        if (config::enable_low_cardinality_optimize) {
+            return 
doris::vectorized::ColumnDictionary<doris::vectorized::Int32>::create();
+        }
         return doris::vectorized::PredicateColumnType<StringValue>::create();
 
     case OLAP_FIELD_TYPE_DECIMAL:
diff --git a/be/src/runtime/string_value.h b/be/src/runtime/string_value.h
index 285189e..860fced 100644
--- a/be/src/runtime/string_value.h
+++ b/be/src/runtime/string_value.h
@@ -22,9 +22,53 @@
 
 #include "udf/udf.h"
 #include "util/hash_util.hpp"
+#include "util/cpu_info.h"
+#include "vec/common/string_ref.h"
+#ifdef __SSE4_2__
+#include "util/sse_util.hpp"
+#endif
 
 namespace doris {
 
+// Compare two strings using sse4.2 intrinsics if they are available. This 
code assumes
+// that the trivial cases are already handled (i.e. one string is empty).
+// Returns:
+//   < 0 if s1 < s2
+//   0 if s1 == s2
+//   > 0 if s1 > s2
+// The SSE code path is just under 2x faster than the non-sse code path.
+//   - s1/n1: ptr/len for the first string
+//   - s2/n2: ptr/len for the second string
+//   - len: min(n1, n2) - this can be more cheaply passed in by the caller
+static inline int string_compare(const char* s1, int64_t n1, const char* s2, 
int64_t n2,
+                                 int64_t len) {
+    DCHECK_EQ(len, std::min(n1, n2));
+#ifdef __SSE4_2__
+    while (len >= sse_util::CHARS_PER_128_BIT_REGISTER) {
+        __m128i xmm0 = _mm_loadu_si128(reinterpret_cast<const __m128i*>(s1));
+        __m128i xmm1 = _mm_loadu_si128(reinterpret_cast<const __m128i*>(s2));
+        int chars_match =
+                _mm_cmpestri(xmm0, sse_util::CHARS_PER_128_BIT_REGISTER, xmm1,
+                             sse_util::CHARS_PER_128_BIT_REGISTER, 
sse_util::STRCMP_MODE);
+        if (chars_match != sse_util::CHARS_PER_128_BIT_REGISTER) {
+            return (unsigned char)s1[chars_match] - (unsigned 
char)s2[chars_match];
+        }
+        len -= sse_util::CHARS_PER_128_BIT_REGISTER;
+        s1 += sse_util::CHARS_PER_128_BIT_REGISTER;
+        s2 += sse_util::CHARS_PER_128_BIT_REGISTER;
+    }
+#endif
+    unsigned char u1, u2;
+    while (len-- > 0) {
+        u1 = (unsigned char)*s1++;
+        u2 = (unsigned char)*s2++;
+        if (u1 != u2) return u1 - u2;
+        if (u1 == '\0') return n1 - n2;
+    }
+
+    return n1 - n2;
+}
+
 // The format of a string-typed slot.
 // The returned StringValue of all functions that return StringValue
 // shares its buffer the parent.
@@ -43,6 +87,7 @@ struct StringValue {
     size_t len;
 
     StringValue(char* ptr, int len) : ptr(ptr), len(len) {}
+    StringValue(const char* ptr, int len) : ptr(const_cast<char*>(ptr)), 
len(len) {}
     StringValue() : ptr(nullptr), len(0) {}
 
     /// Construct a StringValue from 's'.  's' must be valid for as long as
@@ -60,10 +105,36 @@ struct StringValue {
     // this < other: -1
     // this == other: 0
     // this > other: 1
-    int compare(const StringValue& other) const;
+    inline int compare(const StringValue& other) const {
+        int l = std::min(len, other.len);
+
+        if (l == 0) {
+            if (len == other.len) {
+                return 0;
+            } else if (len == 0) {
+                return -1;
+            } else {
+                DCHECK_EQ(other.len, 0);
+                return 1;
+            }
+        }
+
+        return string_compare(this->ptr, this->len, other.ptr, other.len, l);
+    }
 
     // ==
-    bool eq(const StringValue& other) const;
+    inline bool eq(const StringValue& other) const {
+        if (this->len != other.len) {
+            return false;
+        }
+
+#if defined(__SSE2__)
+        return memequalSSE2Wide(this->ptr, other.ptr, this->len);
+#endif
+
+        return string_compare(this->ptr, this->len, other.ptr, other.len, 
this->len) == 0;
+    }
+
     bool operator==(const StringValue& other) const { return eq(other); }
     // !=
     bool ne(const StringValue& other) const { return !eq(other); }
@@ -111,6 +182,12 @@ struct StringValue {
     static StringValue min_string_val();
 
     static StringValue max_string_val();
+
+    struct Comparator {
+        bool operator()(const StringValue& a, const StringValue& b) const {
+            return a.compare(b) < 0;
+        }
+    };
 };
 
 // This function must be called 'hash_value' to be picked up by boost.
diff --git a/be/src/runtime/string_value.hpp b/be/src/runtime/string_value.hpp
index aac9e3a..a51ee5e 100644
--- a/be/src/runtime/string_value.hpp
+++ b/be/src/runtime/string_value.hpp
@@ -21,81 +21,9 @@
 #include <cstring>
 
 #include "runtime/string_value.h"
-#include "util/cpu_info.h"
-#include "vec/common/string_ref.h"
-#ifdef __SSE4_2__
-#include "util/sse_util.hpp"
-#endif
 
 namespace doris {
 
-// Compare two strings using sse4.2 intrinsics if they are available. This 
code assumes
-// that the trivial cases are already handled (i.e. one string is empty).
-// Returns:
-//   < 0 if s1 < s2
-//   0 if s1 == s2
-//   > 0 if s1 > s2
-// The SSE code path is just under 2x faster than the non-sse code path.
-//   - s1/n1: ptr/len for the first string
-//   - s2/n2: ptr/len for the second string
-//   - len: min(n1, n2) - this can be more cheaply passed in by the caller
-static inline int string_compare(const char* s1, int64_t n1, const char* s2, 
int64_t n2,
-                                 int64_t len) {
-    DCHECK_EQ(len, std::min(n1, n2));
-#ifdef __SSE4_2__
-    while (len >= sse_util::CHARS_PER_128_BIT_REGISTER) {
-        __m128i xmm0 = _mm_loadu_si128(reinterpret_cast<const __m128i*>(s1));
-        __m128i xmm1 = _mm_loadu_si128(reinterpret_cast<const __m128i*>(s2));
-        int chars_match =
-                _mm_cmpestri(xmm0, sse_util::CHARS_PER_128_BIT_REGISTER, xmm1,
-                             sse_util::CHARS_PER_128_BIT_REGISTER, 
sse_util::STRCMP_MODE);
-        if (chars_match != sse_util::CHARS_PER_128_BIT_REGISTER) {
-            return (unsigned char)s1[chars_match] - (unsigned 
char)s2[chars_match];
-        }
-        len -= sse_util::CHARS_PER_128_BIT_REGISTER;
-        s1 += sse_util::CHARS_PER_128_BIT_REGISTER;
-        s2 += sse_util::CHARS_PER_128_BIT_REGISTER;
-    }
-#endif
-    unsigned char u1, u2;
-    while (len-- > 0) {
-        u1 = (unsigned char)*s1++;
-        u2 = (unsigned char)*s2++;
-        if (u1 != u2) return u1 - u2;
-        if (u1 == '\0') return n1 - n2;
-    }
-
-    return n1 - n2;
-}
-
-inline int StringValue::compare(const StringValue& other) const {
-    int l = std::min(len, other.len);
-
-    if (l == 0) {
-        if (len == other.len) {
-            return 0;
-        } else if (len == 0) {
-            return -1;
-        } else {
-            DCHECK_EQ(other.len, 0);
-            return 1;
-        }
-    }
-
-    return string_compare(this->ptr, this->len, other.ptr, other.len, l);
-}
-
-inline bool StringValue::eq(const StringValue& other) const {
-    if (this->len != other.len) {
-        return false;
-    }
-#if defined(__SSE2__)
-    return memequalSSE2Wide(this->ptr, other.ptr, this->len);
-#endif
-
-    return string_compare(this->ptr, this->len, other.ptr, other.len, 
this->len) == 0;
-}
-
 inline StringValue StringValue::substring(int start_pos) const {
     return StringValue(ptr + start_pos, len - start_pos);
 }
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 07989f4..0927f34 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -178,8 +178,10 @@ public:
     virtual void insert_many_fix_len_data(const char* pos, size_t num) {
       LOG(FATAL) << "Method insert_many_fix_len_data is not supported for " << 
get_name();
     }
- 
-    virtual void insert_many_dict_data(const int32_t* data_array, size_t 
start_index, const StringRef* dict, size_t num) {
+
+    // todo(zeno) Use dict_args temp object to cover all arguments
+    virtual void insert_many_dict_data(const int32_t* data_array, size_t 
start_index,
+                                       const StringRef* dict, size_t data_num, 
uint32_t dict_num = 0) {
       LOG(FATAL) << "Method insert_many_dict_data is not supported for " << 
get_name();
     }
  
@@ -426,6 +428,8 @@ public:
 
     virtual bool is_predicate_column() const { return false; }
 
+    virtual bool is_column_dictionary() const { return false; }
+
     /// If the only value column can contain is NULL.
     /// Does not imply type of object, because it can be 
ColumnNullable(ColumnNothing) or ColumnConst(ColumnNullable(ColumnNothing))
     virtual bool only_null() const { return false; }
diff --git a/be/src/vec/columns/column_dictionary.h 
b/be/src/vec/columns/column_dictionary.h
new file mode 100644
index 0000000..3aa5bd7
--- /dev/null
+++ b/be/src/vec/columns/column_dictionary.h
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <parallel_hashmap/phmap.h>
+
+#include "gutil/hash/string_hash.h"
+#include "olap/decimal12.h"
+#include "olap/uint24.h"
+#include "runtime/string_value.h"
+#include "util/slice.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_impl.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/columns/predicate_column.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+
+/**
+ * For low cardinality string columns, using ColumnDictionary can reduce memory
+ * usage and improve query efficiency.
+ * For equal predicate comparisons, convert the predicate constant to encodings
+ * according to the dictionary, so that encoding comparisons are used instead
+ * of string comparisons to improve performance.
+ * For range comparison predicates, it is necessary to sort the dictionary
+ * contents, convert the encoding column, and then compare the encoding 
directly.
+ * If the read data page contains plain-encoded data pages, the dictionary
+ * columns are converted into PredicateColumn for processing.
+ * Currently ColumnDictionary is only used for storage layer.
+ */
+template <typename T>
+class ColumnDictionary final : public COWHelper<IColumn, ColumnDictionary<T>> {
+private:
+    friend class COWHelper<IColumn, ColumnDictionary>;
+
+    ColumnDictionary() {}
+    ColumnDictionary(const size_t n) : codes(n) {}
+    ColumnDictionary(const ColumnDictionary& src) : codes(src.codes.begin(), 
src.codes.end()) {}
+
+public:
+    using Self = ColumnDictionary;
+    using value_type = T;
+    using Container = PaddedPODArray<value_type>;
+    using DictContainer = PaddedPODArray<StringValue>;
+
+    bool is_numeric() const override { return false; }
+
+    bool is_predicate_column() const override { return false; }
+
+    bool is_column_dictionary() const override { return true; }
+
+    size_t size() const override { return codes.size(); }
+
+    [[noreturn]] StringRef get_data_at(size_t n) const override {
+        LOG(FATAL) << "get_data_at not supported in ColumnDictionary";
+    }
+
+    void insert_from(const IColumn& src, size_t n) override {
+        LOG(FATAL) << "insert_from not supported in ColumnDictionary";
+    }
+
+    void insert_range_from(const IColumn& src, size_t start, size_t length) 
override {
+        LOG(FATAL) << "insert_range_from not supported in ColumnDictionary";
+    }
+
+    void insert_indices_from(const IColumn& src, const int* indices_begin,
+                             const int* indices_end) override {
+        LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary";
+    }
+
+    void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported 
in ColumnDictionary"; }
+
+    void update_hash_with_value(size_t n, SipHash& hash) const override {
+        LOG(FATAL) << "update_hash_with_value not supported in 
ColumnDictionary";
+    }
+
+    void insert_data(const char* pos, size_t /*length*/) override {
+        codes.push_back(unaligned_load<T>(pos));
+    }
+
+    void insert_data(const T value) { codes.push_back(value); }
+
+    void insert_default() override { codes.push_back(T()); }
+
+    void clear() override { codes.clear(); }
+
+    // TODO: Make dict memory usage more precise
+    size_t byte_size() const override { return codes.size() * 
sizeof(codes[0]); }
+
+    size_t allocated_bytes() const override { return byte_size(); }
+
+    void protect() override {}
+
+    void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
+                         IColumn::Permutation& res) const override {
+        LOG(FATAL) << "get_permutation not supported in ColumnDictionary";
+    }
+
+    void reserve(size_t n) override { codes.reserve(n); }
+
+    [[noreturn]] const char* get_family_name() const override {
+        LOG(FATAL) << "get_family_name not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] MutableColumnPtr clone_resized(size_t size) const override {
+        LOG(FATAL) << "clone_resized not supported in ColumnDictionary";
+    }
+
+    void insert(const Field& x) override {
+        LOG(FATAL) << "insert not supported in ColumnDictionary";
+    }
+
+    Field operator[](size_t n) const override { return codes[n]; }
+
+    void get(size_t n, Field& res) const override { res = (*this)[n]; }
+
+    [[noreturn]] UInt64 get64(size_t n) const override {
+        LOG(FATAL) << "get field not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] Float64 get_float64(size_t n) const override {
+        LOG(FATAL) << "get field not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] UInt64 get_uint(size_t n) const override {
+        LOG(FATAL) << "get field not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] bool get_bool(size_t n) const override {
+        LOG(FATAL) << "get field not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] Int64 get_int(size_t n) const override {
+        LOG(FATAL) << "get field not supported in ColumnDictionary";
+    }
+
+    Container& get_data() { return codes; }
+
+    const Container& get_data() const { return codes; }
+
+    T find_code(const StringValue& value) const { return 
dict.find_code(value); }
+
+    T find_bound_code(const StringValue& value, bool lower, bool eq) const {
+        return dict.find_bound_code(value, lower, eq);
+    }
+
+    phmap::flat_hash_set<T> find_codes(const 
phmap::flat_hash_set<StringValue>& values) const {
+        return dict.find_codes(values);
+    }
+
+    // it's impossable to use ComplexType as key , so we don't have to 
implemnt them
+    [[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
+                                                      char const*& begin) 
const {
+        LOG(FATAL) << "serialize_value_into_arena not supported in 
ColumnDictionary";
+    }
+
+    [[noreturn]] const char* deserialize_and_insert_from_arena(const char* 
pos) {
+        LOG(FATAL) << "deserialize_and_insert_from_arena not supported in 
ColumnDictionary";
+    }
+
+    [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
+                                int nan_direction_hint) const {
+        LOG(FATAL) << "compare_at not supported in ColumnDictionary";
+    }
+
+    void get_extremes(Field& min, Field& max) const {
+        LOG(FATAL) << "get_extremes not supported in ColumnDictionary";
+    }
+
+    bool can_be_inside_nullable() const override { return true; }
+
+    bool is_fixed_and_contiguous() const override { return true; }
+
+    size_t size_of_value_if_fixed() const override { return sizeof(T); }
+
+    [[noreturn]] StringRef get_raw_data() const override {
+        LOG(FATAL) << "get_raw_data not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] bool structure_equals(const IColumn& rhs) const override {
+        LOG(FATAL) << "structure_equals not supported in ColumnDictionary";
+    }
+
+    [[noreturn]] ColumnPtr filter(const IColumn::Filter& filt,
+                                  ssize_t result_size_hint) const override {
+        LOG(FATAL) << "filter not supported in ColumnDictionary";
+    };
+
+    [[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t 
limit) const override {
+        LOG(FATAL) << "permute not supported in ColumnDictionary";
+    };
+
+    [[noreturn]] ColumnPtr replicate(const IColumn::Offsets& 
replicate_offsets) const override {
+        LOG(FATAL) << "replicate not supported in ColumnDictionary";
+    };
+
+    [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
+                                        const IColumn::Selector& selector) 
const override {
+        LOG(FATAL) << "scatter not supported in ColumnDictionary";
+    }
+
+    Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn* 
col_ptr) override {
+        auto* res_col = reinterpret_cast<vectorized::ColumnString*>(col_ptr);
+        for (size_t i = 0; i < sel_size; i++) {
+            uint16_t n = sel[i];
+            auto& code = reinterpret_cast<T&>(codes[n]);
+            auto value = dict.get_value(code);
+            res_col->insert_data(value.ptr, value.len);
+        }
+        return Status::OK();
+    }
+
+    void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) 
override {
+        LOG(FATAL) << "should not call replace_column_data in 
ColumnDictionary";
+    }
+
+    void replace_column_data_default(size_t self_row = 0) override {
+        LOG(FATAL) << "should not call replace_column_data_default in 
ColumnDictionary";
+    }
+
+    void insert_many_dict_data(const int32_t* data_array, size_t start_index,
+                               const StringRef* dict_array, size_t data_num,
+                               uint32_t dict_num) override {
+        if (!is_dict_inited()) {
+            dict.reserve(dict_num);
+            for (uint32_t i = 0; i < dict_num; ++i) {
+                auto value = StringValue(dict_array[i].data, 
dict_array[i].size);
+                dict.insert_value(value);
+            }
+            _dict_inited = true;
+        }
+
+        char* end_ptr = (char*)codes.get_end_ptr();
+        memcpy(end_ptr, data_array + start_index, data_num * sizeof(T));
+        end_ptr += data_num * sizeof(T);
+        codes.set_end_ptr(end_ptr);
+    }
+
+    bool is_dict_inited() const { return _dict_inited; }
+
+    bool is_dict_sorted() const { return _dict_sorted; }
+
+    bool is_dict_code_converted() const { return _dict_code_converted; }
+
+    ColumnPtr convert_to_predicate_column() {
+        auto res = vectorized::PredicateColumnType<StringValue>::create();
+        size_t size = codes.size();
+        res->reserve(size);
+        for (size_t i = 0; i < size; ++i) {
+            auto& code = reinterpret_cast<T&>(codes[i]);
+            auto value = dict.get_value(code);
+            res->insert_data(value.ptr, value.len);
+        }
+        dict.clear();
+        return res;
+    }
+
+    void convert_dict_codes() {
+        if (!is_dict_sorted()) {
+            sort_dict();
+        }
+
+        if (!is_dict_code_converted()) {
+            for (size_t i = 0; i < size(); ++i) {
+                codes[i] = dict.convert_code(codes[i]);
+            }
+            _dict_code_converted = true;
+        }
+    }
+
+    void sort_dict() {
+        dict.sort();
+        _dict_sorted = true;
+    }
+
+    class Dictionary {
+    public:
+        Dictionary() = default;
+
+        void reserve(size_t n) {
+            dict_data.reserve(n);
+            inverted_index.reserve(n);
+        }
+
+        inline void insert_value(StringValue& value) {
+            dict_data.push_back_without_reserve(value);
+            inverted_index[value] = inverted_index.size();
+        }
+
+        inline T find_code(const StringValue& value) const {
+            auto it = inverted_index.find(value);
+            if (it != inverted_index.end()) {
+                return it->second;
+            }
+            return -1;
+        }
+
+        inline T find_bound_code(const StringValue& value, bool lower, bool 
eq) const {
+            auto code = find_code(value);
+            if (code >= 0) {
+                return code;
+            }
+
+            if (lower) {
+                return std::lower_bound(dict_data.begin(), dict_data.end(), 
value) - dict_data.begin() - eq;
+            } else {
+                return std::upper_bound(dict_data.begin(), dict_data.end(), 
value) - dict_data.begin() + eq;
+            }
+        }
+
+        inline phmap::flat_hash_set<T> find_codes(const 
phmap::flat_hash_set<StringValue>& values) const {
+            phmap::flat_hash_set<T> code_set;
+            for (const auto& value : values) {
+                auto it = inverted_index.find(value);
+                if (it != inverted_index.end()) {
+                    code_set.insert(it->second);
+                }
+            }
+            return code_set;
+        }
+
+        inline StringValue& get_value(T code) { return dict_data[code]; }
+
+        void clear() {
+            dict_data.clear();
+            inverted_index.clear();
+            code_convert_map.clear();
+        }
+
+        void sort() {
+            size_t dict_size = dict_data.size();
+            std::sort(dict_data.begin(), dict_data.end(), comparator);
+            for (size_t i = 0; i < dict_size; ++i) {
+                code_convert_map[inverted_index.find(dict_data[i])->second] = 
(T)i;
+                inverted_index[dict_data[i]] = (T)i;
+            }
+        }
+
+        inline T convert_code(const T& code) const { return 
code_convert_map.find(code)->second; }
+
+        size_t byte_size() { return dict_data.size() * sizeof(dict_data[0]); }
+
+    private:
+        struct HashOfStringValue {
+            size_t operator()(const StringValue& value) const {
+                return HashStringThoroughly(value.ptr, value.len);
+            }
+        };
+
+        StringValue::Comparator comparator;
+        // dict code -> dict value
+        DictContainer dict_data;
+        // dict value -> dict code
+        phmap::flat_hash_map<StringValue, T, HashOfStringValue> inverted_index;
+        // data page code -> sorted dict code, only used for range comparison 
predicate
+        phmap::flat_hash_map<T, T> code_convert_map;
+    };
+
+private:
+    bool _dict_inited = false;
+    bool _dict_sorted = false;
+    bool _dict_code_converted = false;
+    Dictionary dict;
+    Container codes;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 1a792f7..5076358 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -99,9 +99,10 @@ public:
         get_nested_column().insert_many_fix_len_data(pos, num);
     }
  
-    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict, size_t num) override {
-        get_null_map_column().fill(0, num);
-        get_nested_column().insert_many_dict_data(data_array, start_index, 
dict, num);
+    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict,
+                               size_t data_num, uint32_t dict_num) override {
+        get_null_map_column().fill(0, data_num);
+        get_nested_column().insert_many_dict_data(data_array, start_index, 
dict, data_num, dict_num);
     }
  
     void insert_many_binary_data(char* data_array, uint32_t* len_array, 
uint32_t* start_offset_array, size_t num) override {
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 236216c..a7975df 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -165,7 +165,8 @@ public:
         }
     };
  
-    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict, size_t num) override {
+    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict,
+                               size_t num, uint32_t /*dict_num*/) override {
         for (size_t end_index = start_index+num; start_index < end_index; 
++start_index) {
             int32_t codeword = data_array[start_index];
             insert_data(dict[codeword].data, dict[codeword].size);
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 69a89fd..2598cc4 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -217,7 +217,8 @@ public:
         }
     }
 
-    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict, size_t num) override {
+    void insert_many_dict_data(const int32_t* data_array, size_t start_index, 
const StringRef* dict,
+                               size_t num, uint32_t /*dict_num*/) override {
         if constexpr (std::is_same_v<T, StringValue>) {
             for (size_t end_index = start_index+num; start_index < end_index; 
++start_index) {
                 int32_t codeword = data_array[start_index];

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to