This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2734b8d3d28 [branch-3.0](auto-partition) Re-add deduplication to auto
partition rpc (#40580) (#42020)
2734b8d3d28 is described below
commit 2734b8d3d28b009ffac83b7339578757daf0497f
Author: zclllhhjj <[email protected]>
AuthorDate: Wed Oct 23 15:49:06 2024 +0800
[branch-3.0](auto-partition) Re-add deduplication to auto partition rpc
(#40580) (#42020)
pick https://github.com/apache/doris/pull/40580
---
be/src/vec/sink/vrow_distribution.cpp | 6 ++++--
be/src/vec/sink/vrow_distribution.h | 28 +++++++++++++++++++++++-----
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index d45aa2ea911..3a4c7e911f4 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -68,8 +68,10 @@ Status VRowDistribution::_save_missing_values(
}
cur_row_values.push_back(node);
}
- //For duplicate cur_values, they will be filtered in FE
- _partitions_need_create.emplace_back(cur_row_values);
+ if (!_deduper.contains(cur_row_values)) {
+ _deduper.insert(cur_row_values);
+ _partitions_need_create.emplace_back(cur_row_values);
+ }
}
// to avoid too large mem use
diff --git a/be/src/vec/sink/vrow_distribution.h
b/be/src/vec/sink/vrow_distribution.h
index 5267b488400..fffe0e3f7f1 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -24,7 +24,9 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <cstdint>
+#include <functional>
#include <string>
+#include <unordered_set>
#include <vector>
#include "common/status.h"
@@ -133,6 +135,10 @@ public:
Status automatic_create_partition();
void clear_batching_stats();
+ // for auto partition
+ std::unique_ptr<MutableBlock> _batching_block;
+ bool _deal_batched = false; // If true, send batched block before any
block's append.
+
private:
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs>
_get_partition_function();
@@ -170,17 +176,29 @@ private:
int64_t rows);
void _reset_find_tablets(int64_t rows);
+ struct NullableStringListHash {
+ std::size_t _hash(const TNullableStringLiteral& arg) const {
+ if (arg.is_null) {
+ return 0;
+ }
+ return std::hash<std::string>()(arg.value);
+ }
+ std::size_t operator()(const std::vector<TNullableStringLiteral>& arg)
const {
+ std::size_t result = 0;
+ for (const auto& v : arg) {
+ result = (result << 1) ^ _hash(v);
+ }
+ return result;
+ }
+ };
+
RuntimeState* _state = nullptr;
int _batch_size = 0;
// for auto partitions
std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
-
-public:
- std::unique_ptr<MutableBlock> _batching_block;
- bool _deal_batched = false; // If true, send batched block before any
block's append.
-private:
size_t _batching_rows = 0, _batching_bytes = 0;
+ std::unordered_set<std::vector<TNullableStringLiteral>,
NullableStringListHash> _deduper;
OlapTableBlockConvertor* _block_convertor = nullptr;
OlapTabletFinder* _tablet_finder = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]