github-actions[bot] commented on code in PR #27071:
URL: https://github.com/apache/doris/pull/27071#discussion_r1394258459


##########
be/src/exprs/bloom_filter_func.h:
##########
@@ -310,137 +277,199 @@
         }
         return new_size;
     }
+};
 
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        for (int i = 0; i < number; i++) {
-            results[i] = false;
-            if (nullmap != nullptr && nullmap[i]) {
-                continue;
+template <class T>
+struct CommonFindOp : BaseOp {
+    uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
+                                    const uint8* nullmap, uint16_t* offsets, 
int number,
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(T));
+    }
+
+    void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                      size_t start) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col = nullable->get_nested_column();
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = (T*)col.get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(*(data + i));
+                }
             }
-            if (!bloom_filter.test_element(*((T*)data + i))) {
-                continue;
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(*(data + i));
+            }
+        }
+    }
+
+    void find_batch(const BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                    uint8_t* results) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = 
(T*)nullable->get_nested_column().get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(data[i]);
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(data[i]);
             }
-            results[i] = true;
         }
     }
 
     void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
         bloom_filter.add_bytes((char*)data, sizeof(T));
     }
     bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
-        return bloom_filter.test(Slice((char*)data, sizeof(T)));
+        return bloom_filter.test_element(((T*)data)[0]);
     }
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return find(bloom_filter, data);
     }
     bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
         return bloom_filter.test(data);
     }
 };
 
-struct StringFindOp {
-    void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, 
const int* offsets,
-                      int number) const {
-        LOG(FATAL) << "StringFindOp does not support insert_batch";
-    }
-
-    void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) 
const {
-        LOG(FATAL) << "StringFindOp does not support insert_single";
-    }
-
+struct StringFindOp : public BaseOp {
     uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
                                     const uint8* nullmap, uint16_t* offsets, 
int number,
-                                    const bool is_parse_column) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine";
-        return 0;
-    }
-
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch";
-    }
-
-    void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes(value->data, value->size);
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(StringRef));
+    }
+
+    static void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                             size_t start) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(col.get_data_at(i));
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(col->get_data_at(i));
+            }
         }
     }
 
-    // This function is only to be used if the be_exec_version may be less 
than 2. If updated, please delete it.
-    void insert_crc32_hash(BloomFilterAdaptor& bloom_filter, const void* data) 
const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes_new_hash(value->data, value->size);
+    static void find_batch(const BloomFilterAdaptor& bloom_filter,
+                           const vectorized::ColumnPtr& column, uint8_t* 
results) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(col.get_data_at(i));
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(col->get_data_at(i));
+            }
         }
     }
 
-    bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+    static void insert(BloomFilterAdaptor& bloom_filter, const void* data) {
         const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value == nullptr) {
-            return false;
+        if (value) {
+            bloom_filter.add_bytes(value->data, value->size);
         }
-        return bloom_filter.test(Slice(value->data, value->size));
     }
 
-    //This function is only to be used if the be_exec_version may be less than 
2. If updated, please delete it.
-    bool find_crc32_hash(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    static bool find(const BloomFilterAdaptor& bloom_filter, const void* data) 
{
         const auto* value = reinterpret_cast<const StringRef*>(data);
         if (value == nullptr) {
             return false;
         }
-        return bloom_filter.test_new_hash(Slice(value->data, value->size));
+        return bloom_filter.test(*value);
     }
 
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return StringFindOp::find(bloom_filter, data);
     }
-    bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+
+    static bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) {
         return bloom_filter.test(data);
     }
 };
 
 // We do not need to judge whether data is empty, because null will not appear
 // when filer used by the storage engine
 struct FixedStringFindOp : public StringFindOp {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
input_data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+                          const void* input_data) const override {
         const auto* value = reinterpret_cast<const StringRef*>(input_data);
         int64_t size = value->size;
         const char* data = value->data;
         // CHAR type may pad the tail with \0, need to trim
         while (size > 0 && data[size - 1] == '\0') {
             size--;
         }
-        return bloom_filter.test(Slice(value->data, size));
+        return bloom_filter.test(StringRef(value->data, size));
     }
 };
 
 struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         VecDateTimeValue value;
         value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
-        return bloom_filter.test(Slice((char*)&value, 
sizeof(VecDateTimeValue)));
+        return bloom_filter.test(StringRef((char*)&value, 
sizeof(VecDateTimeValue)));
     }
 };
 
 // avoid violating C/C++ aliasing rules.
 // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684
 
 struct DateFindOp : public CommonFindOp<VecDateTimeValue> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {

Review Comment:
   warning: method 'find_olap_engine' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const void* data) override {
   ```
   



##########
be/src/exprs/bloom_filter_func.h:
##########
@@ -310,137 +277,199 @@ struct CommonFindOp {
         }
         return new_size;
     }
+};
 
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        for (int i = 0; i < number; i++) {
-            results[i] = false;
-            if (nullmap != nullptr && nullmap[i]) {
-                continue;
+template <class T>
+struct CommonFindOp : BaseOp {
+    uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
+                                    const uint8* nullmap, uint16_t* offsets, 
int number,
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(T));
+    }
+
+    void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                      size_t start) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col = nullable->get_nested_column();
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = (T*)col.get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(*(data + i));
+                }
             }
-            if (!bloom_filter.test_element(*((T*)data + i))) {
-                continue;
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(*(data + i));
+            }
+        }
+    }
+
+    void find_batch(const BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                    uint8_t* results) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = 
(T*)nullable->get_nested_column().get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(data[i]);
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(data[i]);
             }
-            results[i] = true;
         }
     }
 
     void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
         bloom_filter.add_bytes((char*)data, sizeof(T));
     }
     bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
-        return bloom_filter.test(Slice((char*)data, sizeof(T)));
+        return bloom_filter.test_element(((T*)data)[0]);
     }
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return find(bloom_filter, data);
     }
     bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
         return bloom_filter.test(data);
     }
 };
 
-struct StringFindOp {
-    void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, 
const int* offsets,
-                      int number) const {
-        LOG(FATAL) << "StringFindOp does not support insert_batch";
-    }
-
-    void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) 
const {
-        LOG(FATAL) << "StringFindOp does not support insert_single";
-    }
-
+struct StringFindOp : public BaseOp {
     uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
                                     const uint8* nullmap, uint16_t* offsets, 
int number,
-                                    const bool is_parse_column) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine";
-        return 0;
-    }
-
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch";
-    }
-
-    void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes(value->data, value->size);
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(StringRef));
+    }
+
+    static void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                             size_t start) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(col.get_data_at(i));
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(col->get_data_at(i));
+            }
         }
     }
 
-    // This function is only to be used if the be_exec_version may be less 
than 2. If updated, please delete it.
-    void insert_crc32_hash(BloomFilterAdaptor& bloom_filter, const void* data) 
const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes_new_hash(value->data, value->size);
+    static void find_batch(const BloomFilterAdaptor& bloom_filter,
+                           const vectorized::ColumnPtr& column, uint8_t* 
results) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(col.get_data_at(i));
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(col->get_data_at(i));
+            }
         }
     }
 
-    bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+    static void insert(BloomFilterAdaptor& bloom_filter, const void* data) {
         const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value == nullptr) {
-            return false;
+        if (value) {
+            bloom_filter.add_bytes(value->data, value->size);
         }
-        return bloom_filter.test(Slice(value->data, value->size));
     }
 
-    //This function is only to be used if the be_exec_version may be less than 
2. If updated, please delete it.
-    bool find_crc32_hash(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    static bool find(const BloomFilterAdaptor& bloom_filter, const void* data) 
{
         const auto* value = reinterpret_cast<const StringRef*>(data);
         if (value == nullptr) {
             return false;
         }
-        return bloom_filter.test_new_hash(Slice(value->data, value->size));
+        return bloom_filter.test(*value);
     }
 
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return StringFindOp::find(bloom_filter, data);
     }
-    bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+
+    static bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) {
         return bloom_filter.test(data);
     }
 };
 
 // We do not need to judge whether data is empty, because null will not appear
 // when filer used by the storage engine
 struct FixedStringFindOp : public StringFindOp {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
input_data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+                          const void* input_data) const override {
         const auto* value = reinterpret_cast<const StringRef*>(input_data);
         int64_t size = value->size;
         const char* data = value->data;
         // CHAR type may pad the tail with \0, need to trim
         while (size > 0 && data[size - 1] == '\0') {
             size--;
         }
-        return bloom_filter.test(Slice(value->data, size));
+        return bloom_filter.test(StringRef(value->data, size));
     }
 };
 
 struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {

Review Comment:
   warning: method 'find_olap_engine' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const void* data) override {
   ```
   



##########
be/src/exprs/bloom_filter_func.h:
##########
@@ -310,137 +277,199 @@
         }
         return new_size;
     }
+};
 
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        for (int i = 0; i < number; i++) {
-            results[i] = false;
-            if (nullmap != nullptr && nullmap[i]) {
-                continue;
+template <class T>
+struct CommonFindOp : BaseOp {
+    uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
+                                    const uint8* nullmap, uint16_t* offsets, 
int number,
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(T));
+    }
+
+    void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                      size_t start) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col = nullable->get_nested_column();
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = (T*)col.get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(*(data + i));
+                }
             }
-            if (!bloom_filter.test_element(*((T*)data + i))) {
-                continue;
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(*(data + i));
+            }
+        }
+    }
+
+    void find_batch(const BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                    uint8_t* results) const {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            const T* data = 
(T*)nullable->get_nested_column().get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(data[i]);
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const T* data = (T*)column->get_raw_data().data;
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(data[i]);
             }
-            results[i] = true;
         }
     }
 
     void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
         bloom_filter.add_bytes((char*)data, sizeof(T));
     }
     bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
-        return bloom_filter.test(Slice((char*)data, sizeof(T)));
+        return bloom_filter.test_element(((T*)data)[0]);
     }
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return find(bloom_filter, data);
     }
     bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
         return bloom_filter.test(data);
     }
 };
 
-struct StringFindOp {
-    void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data, 
const int* offsets,
-                      int number) const {
-        LOG(FATAL) << "StringFindOp does not support insert_batch";
-    }
-
-    void insert_single(BloomFilterAdaptor& bloom_filter, const char* data) 
const {
-        LOG(FATAL) << "StringFindOp does not support insert_single";
-    }
-
+struct StringFindOp : public BaseOp {
     uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const char* data,
                                     const uint8* nullmap, uint16_t* offsets, 
int number,
-                                    const bool is_parse_column) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine";
-        return 0;
-    }
-
-    void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data, 
const uint8* nullmap,
-                    int number, uint8* results) const {
-        LOG(FATAL) << "StringFindOp does not support find_batch";
-    }
-
-    void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes(value->data, value->size);
+                                    const bool is_parse_column) {
+        return find_batch_olap_engine_with_element_size(bloom_filter, data, 
nullmap, offsets,
+                                                        number, 
is_parse_column, sizeof(StringRef));
+    }
+
+    static void insert_batch(BloomFilterAdaptor& bloom_filter, const 
vectorized::ColumnPtr& column,
+                             size_t start) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = start; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    bloom_filter.add_element(col.get_data_at(i));
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = start; i < column->size(); i++) {
+                bloom_filter.add_element(col->get_data_at(i));
+            }
         }
     }
 
-    // This function is only to be used if the be_exec_version may be less 
than 2. If updated, please delete it.
-    void insert_crc32_hash(BloomFilterAdaptor& bloom_filter, const void* data) 
const {
-        const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value) {
-            bloom_filter.add_bytes_new_hash(value->data, value->size);
+    static void find_batch(const BloomFilterAdaptor& bloom_filter,
+                           const vectorized::ColumnPtr& column, uint8_t* 
results) {
+        if (column->is_nullable()) {
+            const auto* nullable = assert_cast<const 
vectorized::ColumnNullable*>(column.get());
+            const auto& col =
+                    assert_cast<const 
vectorized::ColumnString&>(nullable->get_nested_column());
+            const auto& nullmap =
+                    assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+                            .get_data();
+
+            for (size_t i = 0; i < column->size(); i++) {
+                if (!nullmap[i]) {
+                    results[i] = bloom_filter.test_element(col.get_data_at(i));
+                } else {
+                    results[i] = false;
+                }
+            }
+        } else {
+            const auto& col = assert_cast<const 
vectorized::ColumnString*>(column.get());
+            for (size_t i = 0; i < column->size(); i++) {
+                results[i] = bloom_filter.test_element(col->get_data_at(i));
+            }
         }
     }
 
-    bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+    static void insert(BloomFilterAdaptor& bloom_filter, const void* data) {
         const auto* value = reinterpret_cast<const StringRef*>(data);
-        if (value == nullptr) {
-            return false;
+        if (value) {
+            bloom_filter.add_bytes(value->data, value->size);
         }
-        return bloom_filter.test(Slice(value->data, value->size));
     }
 
-    //This function is only to be used if the be_exec_version may be less than 
2. If updated, please delete it.
-    bool find_crc32_hash(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    static bool find(const BloomFilterAdaptor& bloom_filter, const void* data) 
{
         const auto* value = reinterpret_cast<const StringRef*>(data);
         if (value == nullptr) {
             return false;
         }
-        return bloom_filter.test_new_hash(Slice(value->data, value->size));
+        return bloom_filter.test(*value);
     }
 
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         return StringFindOp::find(bloom_filter, data);
     }
-    bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+
+    static bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) {
         return bloom_filter.test(data);
     }
 };
 
 // We do not need to judge whether data is empty, because null will not appear
 // when filer used by the storage engine
 struct FixedStringFindOp : public StringFindOp {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
input_data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter,
+                          const void* input_data) const override {
         const auto* value = reinterpret_cast<const StringRef*>(input_data);
         int64_t size = value->size;
         const char* data = value->data;
         // CHAR type may pad the tail with \0, need to trim
         while (size > 0 && data[size - 1] == '\0') {
             size--;
         }
-        return bloom_filter.test(Slice(value->data, size));
+        return bloom_filter.test(StringRef(value->data, size));
     }
 };
 
 struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         VecDateTimeValue value;
         value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
-        return bloom_filter.test(Slice((char*)&value, 
sizeof(VecDateTimeValue)));
+        return bloom_filter.test(StringRef((char*)&value, 
sizeof(VecDateTimeValue)));
     }
 };
 
 // avoid violating C/C++ aliasing rules.
 // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684
 
 struct DateFindOp : public CommonFindOp<VecDateTimeValue> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {
         uint24_t date = *static_cast<const uint24_t*>(data);
         uint64_t value = uint32_t(date);
 
         VecDateTimeValue date_value;
         date_value.from_olap_date(value);
 
-        return bloom_filter.test(Slice((char*)&date_value, 
sizeof(VecDateTimeValue)));
+        return bloom_filter.test(StringRef((char*)&date_value, 
sizeof(VecDateTimeValue)));
     }
 };
 
 struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value> {
-    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const {
+    bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void* 
data) const override {

Review Comment:
   warning: method 'find_olap_engine' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, 
const void* data) override {
   ```
   



##########
be/src/vec/columns/column_string.cpp:
##########
@@ -161,6 +161,43 @@
     }
 }
 
+void ColumnString::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                            const uint32_t* indices_end) {
+    const ColumnString& src_str = assert_cast<const ColumnString&>(src);
+    auto src_offset_data = src_str.offsets.data();
+
+    auto old_char_size = chars.size();
+    size_t total_chars_size = old_char_size;
+
+    auto dst_offsets_pos = offsets.size();
+    offsets.resize(offsets.size() + indices_end - indices_begin);
+    auto* dst_offsets_data = offsets.data();
+
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x != 0) {
+            total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
+        }
+        dst_offsets_data[dst_offsets_pos++] = total_chars_size;
+    }
+    check_chars_length(total_chars_size, offsets.size());
+
+    chars.resize(total_chars_size);
+
+    auto* src_data_ptr = src_str.chars.data();
+    auto* dst_data_ptr = chars.data();
+
+    size_t dst_chars_pos = old_char_size;
+    for (auto x = indices_begin; x != indices_end; ++x) {

Review Comment:
   warning: 'auto x' can be declared as 'const auto *x' 
[readability-qualified-auto]
   
   ```suggestion
       for (const auto *x = indices_begin; x != indices_end; ++x) {
   ```
   



##########
be/src/vec/common/hash_table/hash_map.h:
##########
@@ -193,10 +200,346 @@
     bool has_null_key_data() const { return false; }
 };
 
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, 
Allocator> {
+public:
+    using Self = JoinHashMapTable;
+    using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+    using key_type = Key;
+    using value_type = typename Cell::value_type;
+    using mapped_type = typename Cell::Mapped;
+
+    using LookupResult = typename Base::LookupResult;
+
+    using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+    static uint32_t calc_bucket_size(size_t num_elem) {
+        size_t expect_bucket_size = num_elem + (num_elem - 1) / 7;

Review Comment:
   warning: 7 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
           size_t expect_bucket_size = num_elem + (num_elem - 1) / 7;
                                                                   ^
   ```
   



##########
be/src/vec/common/hash_table/hash_map.h:
##########
@@ -20,9 +20,16 @@
 
 #pragma once
 
+#include <gen_cpp/PlanNodes_types.h>

Review Comment:
   warning: 'gen_cpp/PlanNodes_types.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PlanNodes_types.h>
            ^
   ```
   



##########
be/src/vec/common/hash_table/hash_map.h:
##########
@@ -193,10 +200,346 @@
     bool has_null_key_data() const { return false; }
 };
 
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, 
Allocator> {
+public:
+    using Self = JoinHashMapTable;
+    using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+    using key_type = Key;
+    using value_type = typename Cell::value_type;
+    using mapped_type = typename Cell::Mapped;
+
+    using LookupResult = typename Base::LookupResult;
+
+    using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+    static uint32_t calc_bucket_size(size_t num_elem) {
+        size_t expect_bucket_size = num_elem + (num_elem - 1) / 7;
+        return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
+    }
+
+    size_t get_byte_size() const {
+        auto cal_vector_mem = [](const auto& vec) { return vec.capacity() * 
sizeof(vec[0]); };
+        return cal_vector_mem(visited) + cal_vector_mem(first) + 
cal_vector_mem(next);
+    }
+
+    template <int JoinOpType>
+    void prepare_build(size_t num_elem, int batch_size) {
+        max_batch_size = batch_size;
+        bucket_size = calc_bucket_size(num_elem + 1);
+        first.resize(bucket_size + 1);
+        next.resize(num_elem);
+
+        if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            visited.resize(num_elem);
+        }
+    }
+
+    uint32_t get_bucket_size() const { return bucket_size; }
+
+    size_t size() const { return Base::size() == 0 ? next.size() : 
Base::size(); }
+
+    std::vector<uint8_t>& get_visited() { return visited; }
+
+    void build(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
+               size_t num_elem) {
+        build_keys = keys;
+        for (size_t i = 1; i < num_elem; i++) {
+            uint32_t bucket_num = bucket_nums[i];
+            next[i] = first[bucket_num];
+            first[bucket_num] = i;
+        }
+        first[bucket_size] = 0; // index = bucket_num means null
+    }
+
+    template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, 
bool need_judge_null>
+    auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
+                    int probe_idx, uint32_t build_idx, int probe_rows,
+                    uint32_t* __restrict probe_idxs, uint32_t* __restrict 
build_idxs,
+                    doris::vectorized::ColumnFilterHelper* mark_column) {
+        if constexpr (is_mark_join) {
+            return _find_batch_mark<JoinOpType>(keys, bucket_nums, probe_idx, 
probe_rows,
+                                                probe_idxs, build_idxs, 
mark_column);
+        }
+
+        if constexpr (with_other_conjuncts) {
+            return _find_batch_conjunct<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
+                                                    probe_rows, probe_idxs, 
build_idxs);
+        }
+
+        if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
+                      JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
+            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
+                                                            probe_rows, 
probe_idxs, build_idxs);
+        }
+        if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
+                      JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) 
{
+            return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
+                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs);
+        }
+        if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
+        }
+        return std::tuple {0, 0U, 0};
+    }
+
+    template <int JoinOpType>
+    bool iterate_map(std::vector<uint32_t>& build_idxs) const {
+        const auto batch_size = max_batch_size;
+        const auto elem_num = visited.size();
+        int count = 0;
+        build_idxs.resize(batch_size);
+
+        while (count < batch_size && iter_idx < elem_num) {
+            const auto matched = visited[iter_idx];
+            build_idxs[count] = iter_idx;
+            if constexpr (JoinOpType != doris::TJoinOp::RIGHT_SEMI_JOIN) {
+                count += !matched;
+            } else {
+                count += matched;
+            }
+            iter_idx++;
+        }
+
+        build_idxs.resize(count);
+        return iter_idx >= elem_num;
+    }
+
+private:
+    // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN 
support mark join
+    template <int JoinOpType>
+    auto _find_batch_mark(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+                          int probe_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
+                          uint32_t* __restrict build_idxs,
+                          doris::vectorized::ColumnFilterHelper* mark_column) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+                build_idx = next[build_idx];
+            }
+
+            if (bucket_nums[probe_idx] == bucket_size) {
+                // mark result as null when probe row is null
+                mark_column->insert_null();
+            } else {
+                bool matched = JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? 
build_idx != 0
+                                                                            : 
build_idx == 0;
+                mark_column->insert_value(matched);
+            }
+
+            probe_idxs[matched_cnt] = probe_idx++;
+            build_idxs[matched_cnt] = build_idx;
+            matched_cnt++;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
+    auto _find_batch_right_semi_anti(const Key* __restrict keys,
+                                     const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                     int probe_rows) {
+        while (probe_idx < probe_rows) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx) {
+                if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
+                    visited[build_idx] = 1;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::tuple {probe_idx, 0U, 0};
+    }
+
+    template <int JoinOpType, bool need_judge_null>
+    auto _find_batch_left_semi_anti(const Key* __restrict keys,
+                                    const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                    int probe_rows, uint32_t* __restrict 
probe_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            if constexpr (need_judge_null) {
+                if (bucket_nums[probe_idx] == bucket_size) {
+                    probe_idx++;
+                    continue;
+                }
+            }
+
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+                build_idx = next[build_idx];
+            }
+            bool matched =
+                    JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
+            probe_idxs[matched_cnt] = probe_idx++;
+            matched_cnt += matched;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
+    auto _find_batch_left_semi_anti_conjunct(const Key* __restrict keys,
+                                             const uint32_t* __restrict 
bucket_nums, int probe_idx,
+                                             int probe_rows, uint32_t* 
__restrict probe_idxs,
+                                             uint32_t* __restrict build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+                              int probe_idx, uint32_t build_idx, int 
probe_rows,
+                              uint32_t* __restrict probe_idxs, uint32_t* 
__restrict build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        auto do_the_probe = [&]() {
+            auto matched_cnt_old = matched_cnt;
+            while (build_idx && matched_cnt < batch_size) {
+                bool mathced = false;
+                if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                              JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+                    mathced = !visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx];
+                } else {
+                    mathced = keys[probe_idx] == build_keys[build_idx];
+                }
+                build_idxs[matched_cnt] = build_idx;
+                matched_cnt += mathced;
+                build_idx = next[build_idx];
+            }
+
+            for (auto i = matched_cnt_old; i < matched_cnt; i++) {
+                probe_idxs[i] = probe_idx;
+            }
+
+            if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+                if (!build_idx) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = 0;
+                    matched_cnt++;
+                }
+            }
+
+            probe_idx++;
+        };
+
+        if (build_idx) {
+            do_the_probe();
+        }
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            build_idx = first[bucket_nums[probe_idx]];
+            do_the_probe();
+        }
+
+        probe_idx -=
+                (matched_cnt >= batch_size &&
+                 build_idx); // FULL_OUTER_JOIN may over batch_size when 
emplace 0 into build_idxs
+        return std::tuple {probe_idx, build_idx, matched_cnt};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_inner_outer_join(const Key* __restrict keys,
+                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                      uint32_t build_idx, int probe_rows,
+                                      uint32_t* __restrict probe_idxs,
+                                      uint32_t* __restrict build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        auto do_the_probe = [&]() {
+            while (build_idx && matched_cnt < batch_size) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                    if constexpr (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                                  JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN) {
+                        if (!visited[build_idx]) {
+                            visited[build_idx] = 1;
+                        }
+                    }
+                }
+                build_idx = next[build_idx];
+            }
+
+            if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+                // `(!matched_cnt || probe_idxs[matched_cnt - 1] != 
probe_idx)` means not match one build side
+                if (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = 0;
+                    matched_cnt++;
+                }
+            }
+            probe_idx++;
+        };
+
+        if (build_idx) {
+            do_the_probe();
+        }
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            build_idx = first[bucket_nums[probe_idx]];
+            do_the_probe();
+        }
+
+        probe_idx -= (matched_cnt == batch_size && build_idx);
+        return std::tuple {probe_idx, build_idx, matched_cnt};
+    }
+
+    const Key* __restrict build_keys;
+    std::vector<uint8_t> visited;
+
+    uint32_t bucket_size = 1;
+    int max_batch_size = 4064;

Review Comment:
   warning: 4064 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       int max_batch_size = 4064;
                            ^
   ```
   



##########
be/src/vec/columns/column_map.cpp:
##########
@@ -196,6 +196,17 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+void ColumnMap::insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                         const uint32_t* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {

Review Comment:
   warning: 'auto x' can be declared as 'const auto *x' 
[readability-qualified-auto]
   
   ```suggestion
       for (const auto *x = indices_begin; x != indices_end; ++x) {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -37,7 +37,7 @@ class VRuntimeFilterSlots {
             const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
             : _build_expr_context(build_expr_ctxs), 
_runtime_filter_descs(runtime_filter_descs) {}
 
-    Status init(RuntimeState* state, int64_t hash_table_size, size_t 
build_bf_cardinality) {
+    Status init(RuntimeState* state, int64_t hash_table_size) {

Review Comment:
   warning: method 'init' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status init(RuntimeState* state, int64_t hash_table_size) {
   ```
   



##########
be/src/vec/columns/column_string.cpp:
##########
@@ -161,6 +161,43 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const int* indices_be
     }
 }
 
+void ColumnString::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,

Review Comment:
   warning: method 'insert_indices_from_join' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/columns/column_string.h:477:
   ```diff
   -     void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
   +     static void insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
   ```
   



##########
be/src/vec/columns/column_array.cpp:
##########
@@ -804,6 +804,17 @@ void ColumnArray::insert_indices_from(const IColumn& src, 
const int* indices_beg
     }
 }
 
+void ColumnArray::insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                           const uint32_t* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {

Review Comment:
   warning: 'auto x' can be declared as 'const auto *x' 
[readability-qualified-auto]
   
   ```suggestion
       for (const auto *x = indices_begin; x != indices_end; ++x) {
   ```
   



##########
be/src/vec/columns/column_string.cpp:
##########
@@ -161,6 +161,43 @@
     }
 }
 
+void ColumnString::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                            const uint32_t* indices_end) {
+    const ColumnString& src_str = assert_cast<const ColumnString&>(src);
+    auto src_offset_data = src_str.offsets.data();
+
+    auto old_char_size = chars.size();
+    size_t total_chars_size = old_char_size;
+
+    auto dst_offsets_pos = offsets.size();
+    offsets.resize(offsets.size() + indices_end - indices_begin);
+    auto* dst_offsets_data = offsets.data();
+
+    for (auto x = indices_begin; x != indices_end; ++x) {

Review Comment:
   warning: 'auto x' can be declared as 'const auto *x' 
[readability-qualified-auto]
   
   ```suggestion
       for (const auto *x = indices_begin; x != indices_end; ++x) {
   ```
   



##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -508,24 +476,33 @@ class RuntimePredicateWrapper {
         }
     }
 
-    void insert_batch(const vectorized::ColumnPtr column, const 
std::vector<int>& rows) {
+    void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
         if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
-            bitmap_filter_insert_batch(column, rows);
-        } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, 
_column_return_type)) {
-            insert_fixed_len(column->get_raw_data().data, rows.data(), 
rows.size());
+            bitmap_filter_insert_batch(column, start);
         } else {
-            for (int index : rows) {
-                insert(column->get_data_at(index));
-            }
+            insert_fixed_len(column, start);
         }
     }
 
-    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column,
-                                    const std::vector<int>& rows) {
+    void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t 
start) {

Review Comment:
   warning: method 'bitmap_filter_insert_batch' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void bitmap_filter_insert_batch(const vectorized::ColumnPtr 
column, size_t start) {
   ```
   



##########
be/src/vec/columns/column_struct.cpp:
##########
@@ -233,6 +233,15 @@ void ColumnStruct::insert_indices_from(const IColumn& src, 
const int* indices_be
     }
 }
 
+void ColumnStruct::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,

Review Comment:
   warning: method 'insert_indices_from_join' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/columns/column_struct.h:127:
   ```diff
   -     void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
   +     static void insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to