BiteTheDDDDt commented on code in PR #61495:
URL: https://github.com/apache/doris/pull/61495#discussion_r3043149345


##########
be/src/exec/pipeline/dependency.h:
##########
@@ -426,6 +426,195 @@ struct AggSharedState : public BasicSharedState {
     void _destroy_agg_status(AggregateDataPtr data);
 };
 
+static constexpr int BUCKETED_AGG_NUM_BUCKETS = 256;
+
+/// Shared state for BucketedAggSinkOperatorX / BucketedAggSourceOperatorX.
+///
+/// Each sink pipeline instance owns 256 per-bucket hash tables (two-level 
hash table
+/// approach, inspired by ClickHouse). During sink, each row is routed to 
bucket
+/// (hash >> 24) & 0xFF.
+///
+/// Source-side merge is pipelined with sink completion: as each sink instance 
finishes,
+/// it unblocks all source dependencies. Source instances scan buckets and 
merge data
+/// from finished sink instances into the merge target (the first sink to 
finish).
+/// Each bucket has a CAS lock so only one source works on a bucket at a time.
+/// After all sinks finish and all buckets are merged + output, one source 
handles
+/// null key merge and the pipeline completes.
+///
+/// Thread safety model:
+///  - Sink phase: each instance writes only to its own 
per_instance_data[task_idx]. No locking.
+///  - Source phase: per-bucket CAS lock (merge_in_progress). Under the lock, 
a source
+///    scans all finished sink instances and merges their bucket data into the 
merge
+///    target's bucket. Already-merged entries are nulled out to prevent 
re-processing.
+///    Output is only done when all sinks have finished and the bucket is 
fully merged.
+struct BucketedAggSharedState : public BasicSharedState {
+    ENABLE_FACTORY_CREATOR(BucketedAggSharedState)
+public:
+    BucketedAggSharedState() = default;
+    ~BucketedAggSharedState() override { _close(); }
+
+    /// Per-instance data. One per sink pipeline instance.
+    /// Each instance has 256 bucket hash tables + 1 shared arena.
+    struct PerInstanceData {
+        /// 256 per-bucket hash tables. Each bucket has its own 
BucketedAggDataVariants.
+        /// Uses PHHashMap<StringRef> for string keys instead of StringHashMap.
+        std::vector<BucketedAggDataVariantsUPtr> bucket_agg_data;
+        ArenaUPtr arena;
+
+        PerInstanceData() : arena(std::make_unique<Arena>()) {
+            bucket_agg_data.resize(BUCKETED_AGG_NUM_BUCKETS);
+            for (auto& p : bucket_agg_data) {
+                p = std::make_unique<BucketedAggDataVariants>();
+            }
+        }
+    };
+
+    /// Per-bucket merge state for pipelined source-side processing.
+    struct BucketMergeState {
+        /// CAS lock: only one source instance can merge/output this bucket at 
a time.
+        std::atomic<bool> merge_in_progress {false};
+        /// Set to true once the bucket is fully merged and all rows have been 
output.
+        std::atomic<bool> output_done {false};
+        /// Tracks which sink instances have been merged into the merge target
+        /// for this bucket. Accessed only under merge_in_progress CAS lock.
+        /// Element i is true when instance i's data for this bucket has been 
merged.
+        /// Sized to num_sink_instances in init_instances().
+        std::vector<bool> merged_instances;
+    };
+
+    std::vector<PerInstanceData> per_instance_data;
+    int num_sink_instances = 0;
+
+    /// Tracks how many sinks have finished. Incremented by each sink on EOS.
+    std::atomic<int> num_sinks_finished = 0;
+
+    /// Per-sink completion flags. Set to true when each sink instance 
finishes.
+    /// Source instances read these to know which sinks' data is safe to merge.
+    std::unique_ptr<std::atomic<bool>[]> sink_finished;
+
+    /// Index of the first sink instance to finish. Its bucket hash tables 
serve
+    /// as the merge target — all other sinks' data is merged into it.
+    /// Initialized to -1; the first sink to finish CAS-sets it to its 
instance idx.
+    std::atomic<int> merge_target_instance = -1;
+
+    /// Per-bucket merge state. Indexed by bucket id [0, 256).
+    std::array<BucketMergeState, BUCKETED_AGG_NUM_BUCKETS> bucket_states;
+
+    // Aggregate function metadata (shared, read-only after init).
+    std::vector<AggFnEvaluator*> aggregate_evaluators;
+    VExprContextSPtrs probe_expr_ctxs;
+    size_t total_size_of_aggregate_states = 0;
+    size_t align_aggregate_states = 1;
+    Sizes offsets_of_aggregate_states;
+    std::vector<size_t> make_nullable_keys;
+
+    std::atomic<size_t> input_num_rows {0};
+
+    /// When true, the aggregate has exactly one COUNT(*) function with no 
args.
+    /// In this case, mapped values in the hash table store a UInt64 counter
+    /// directly (reinterpret_cast<AggregateDataPtr>) instead of a pointer to
+    /// allocated aggregate state. This eliminates create/merge/destroy 
overhead.
+    bool use_simple_count = false;
+
+    // ---- Source-side fields ----
+
+    // Null key handling: null keys are stored separately (not in any bucket).
+    // After all buckets are processed, one source instance merges and outputs
+    // all null key data. This atomic ensures exactly one source instance does 
it.
+    std::atomic<bool> null_key_output_claimed {false};
+
+    /// Monotonically increasing counter bumped on every state change (bucket 
lock
+    /// release, sink finish). Used by source instances to detect missed 
wakeups:
+    /// if the generation changed between scan start and post-block() re-check,
+    /// something happened and the source should unblock immediately.
+    std::atomic<uint64_t> state_generation {0};
+
+    /// Initialize per-instance data and optionally run a metadata init 
callback.
+    /// The callback runs exactly once (under std::call_once), must return 
Status,
+    /// and should populate shared metadata like probe_expr_ctxs, 
aggregate_evaluators, etc.
+    /// All threads observe the same init status via _init_status.
+    template <typename Func>
+    Status init_instances(int num_instances, Func&& metadata_init) {
+        std::call_once(_init_once, [&]() {
+            num_sink_instances = num_instances;
+            per_instance_data.resize(num_instances);
+            sink_finished = 
std::make_unique<std::atomic<bool>[]>(num_instances);
+            for (int i = 0; i < num_instances; ++i) {
+                sink_finished[i].store(false, std::memory_order_relaxed);
+            }
+            for (auto& bs : bucket_states) {
+                bs.merged_instances.resize(num_instances, false);
+            }
+            _init_status = std::forward<Func>(metadata_init)();
+        });
+        return _init_status;
+    }
+
+private:
+    std::once_flag _init_once;
+    Status _init_status;
+
+    void _close() {
+        for (auto& inst : per_instance_data) {
+            for (auto& bucket_data : inst.bucket_agg_data) {
+                _close_one_agg_data(*bucket_data);
+            }
+        }
+    }
+
+    void _close_one_agg_data(BucketedAggDataVariants& agg_data) {
+        std::visit(
+                Overload {[&](std::monostate& arg) -> void {
+                              // Do nothing
+                          },
+                          [&](auto& agg_method) -> void {
+                              if (use_simple_count) {
+                                  // simple_count: mapped slots hold UInt64 
counters,
+                                  // not real agg state pointers. Skip destroy.
+                                  return;
+                              }
+                              auto& data = *agg_method.hash_table;
+                              data.for_each_mapped([&](auto& mapped) {
+                                  if (mapped) {
+                                      _destroy_agg_status(mapped);
+                                      mapped = nullptr;
+                                  }
+                              });
+                              if constexpr 
(std::is_assignable_v<decltype(data.has_null_key_data()),
+                                                                 bool>) {
+                                  if (data.has_null_key_data()) {
+                                      _destroy_agg_status(
+                                              data.template 
get_null_key_data<AggregateDataPtr>());
+                                  }
+                              }
+                          }},
+                agg_data.method_variant);
+    }
+
+    void _destroy_agg_status(AggregateDataPtr data);
+};
+
+struct BasicSpillSharedState {

Review Comment:
   解冲突解的有点问题,fix了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to