This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2734b8d3d28 [branch-3.0](auto-partition) Re-add deduplication to auto 
partition rpc (#40580) (#42020)
2734b8d3d28 is described below

commit 2734b8d3d28b009ffac83b7339578757daf0497f
Author: zclllhhjj <[email protected]>
AuthorDate: Wed Oct 23 15:49:06 2024 +0800

    [branch-3.0](auto-partition) Re-add deduplication to auto partition rpc 
(#40580) (#42020)
    
    pick https://github.com/apache/doris/pull/40580
---
 be/src/vec/sink/vrow_distribution.cpp |  6 ++++--
 be/src/vec/sink/vrow_distribution.h   | 28 +++++++++++++++++++++++-----
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index d45aa2ea911..3a4c7e911f4 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -68,8 +68,10 @@ Status VRowDistribution::_save_missing_values(
             }
             cur_row_values.push_back(node);
         }
-        //For duplicate cur_values, they will be filtered in FE
-        _partitions_need_create.emplace_back(cur_row_values);
+        if (!_deduper.contains(cur_row_values)) {
+            _deduper.insert(cur_row_values);
+            _partitions_need_create.emplace_back(cur_row_values);
+        }
     }
 
     // to avoid too large mem use
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index 5267b488400..fffe0e3f7f1 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -24,7 +24,9 @@
 #include <gen_cpp/PaloInternalService_types.h>
 
 #include <cstdint>
+#include <functional>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include "common/status.h"
@@ -133,6 +135,10 @@ public:
     Status automatic_create_partition();
     void clear_batching_stats();
 
+    // for auto partition
+    std::unique_ptr<MutableBlock> _batching_block;
+    bool _deal_batched = false; // If true, send batched block before any 
block's append.
+
 private:
     std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> 
_get_partition_function();
 
@@ -170,17 +176,29 @@ private:
                                     int64_t rows);
     void _reset_find_tablets(int64_t rows);
 
+    struct NullableStringListHash {
+        std::size_t _hash(const TNullableStringLiteral& arg) const {
+            if (arg.is_null) {
+                return 0;
+            }
+            return std::hash<std::string>()(arg.value);
+        }
+        std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) 
const {
+            std::size_t result = 0;
+            for (const auto& v : arg) {
+                result = (result << 1) ^ _hash(v);
+            }
+            return result;
+        }
+    };
+
     RuntimeState* _state = nullptr;
     int _batch_size = 0;
 
     // for auto partitions
     std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
-
-public:
-    std::unique_ptr<MutableBlock> _batching_block;
-    bool _deal_batched = false; // If true, send batched block before any 
block's append.
-private:
     size_t _batching_rows = 0, _batching_bytes = 0;
+    std::unordered_set<std::vector<TNullableStringLiteral>, 
NullableStringListHash> _deduper;
 
     OlapTableBlockConvertor* _block_convertor = nullptr;
     OlapTabletFinder* _tablet_finder = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to