liaoxin01 commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3475262228


##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -539,6 +543,98 @@ public Map<String, Integer> getBeToInstancesNum() {
         return result;
     }
 
+    public static final class AdaptiveRandomBucketSinkContext {
+        private final List<Long> sinkBackendIds;
+        private final int planFragmentNum;
+
+        private AdaptiveRandomBucketSinkContext(List<Long> sinkBackendIds, int 
planFragmentNum) {
+            this.sinkBackendIds = sinkBackendIds;
+            this.planFragmentNum = planFragmentNum;
+        }
+
+        public List<Long> getSinkBackendIds() {
+            return sinkBackendIds;
+        }
+
+        public int getPlanFragmentNum() {
+            return planFragmentNum;
+        }
+    }
+
+    public Optional<AdaptiveRandomBucketSinkContext> 
getAdaptiveRandomBucketSinkContext(long tableId) {
+        Set<Long> sinkBackendIds = new TreeSet<>();
+        int planFragmentNum = 0;
+        for (PipelineExecContext context : pipelineExecContexts.values()) {
+            TPipelineFragmentParams params = context.rpcParams;
+            if (params.getFragment().getOutputSink() == null
+                    || params.getFragment().getOutputSink().getType() != 
TDataSinkType.OLAP_TABLE_SINK) {
+                continue;
+            }
+            TOlapTableSink sink = 
params.getFragment().getOutputSink().getOlapTableSink();
+            if (sink.getTableId() != tableId) {
+                continue;
+            }
+            if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+                continue;
+            }
+            sinkBackendIds.add(params.getBackendId());
+            planFragmentNum += params.getLocalParamsSize();
+        }
+        if (sinkBackendIds.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(new AdaptiveRandomBucketSinkContext(
+                new ArrayList<>(sinkBackendIds), Math.max(planFragmentNum, 
1)));
+    }
+
+    private static void assignAdaptiveRandomBucketForFragment(
+            Collection<TPipelineFragmentParams> fragmentParamsList) {
+        List<TPipelineFragmentParams> sinkParams = fragmentParamsList.stream()
+                .filter(param -> param.getFragment().getOutputSink() != null
+                        && param.getFragment().getOutputSink().getType() == 
TDataSinkType.OLAP_TABLE_SINK)
+                .collect(Collectors.toList());
+        if (sinkParams.isEmpty()) {
+            return;
+        }
+        TOlapTableSink sink = 
sinkParams.get(0).getFragment().getOutputSink().getOlapTableSink();
+        if (!OlapTableSink.shouldAssignAdaptiveRandomBucket(sink)) {
+            return;
+        }
+        List<Long> sinkBackendIds = sinkParams.stream()
+                .map(TPipelineFragmentParams::getBackendId)
+                .distinct()
+                .sorted()
+                .collect(Collectors.toList());
+        int planFragmentNum = sinkParams.stream()

Review Comment:
   planFragmentNum -> sinkInstanceNum



##########
be/src/exec/sink/writer/vtablet_writer.cpp:
##########
@@ -676,6 +716,77 @@ void VNodeChannel::_open_internal(bool is_incremental) {
     request->set_txn_expiration(_parent->_txn_expiration);
     request->set_write_file_cache(_parent->_write_file_cache);
 
+    if (_parent->_tablet_finder->is_adaptive_random_bucket()) {

Review Comment:
   Consider extracting the larger is_adaptive_random_bucket() blocks into named 
methods.



##########
be/src/load/channel/tablets_channel.h:
##########
@@ -185,6 +197,10 @@ class BaseTabletsChannel {
     std::unordered_set<int64_t> _reducing_tablets;
 
     std::unordered_set<int64_t> _partition_ids;
+    std::shared_ptr<AdaptiveRandomBucketState> _adaptive_random_bucket_state;
+    std::mutex _partition_route_locks_lock;
+    std::unordered_map<int32_t, std::unordered_map<int64_t, 
std::shared_ptr<std::mutex>>>

Review Comment:
   add comment.



##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -683,6 +728,169 @@ Status BaseTabletsChannel::_write_block_data(
     return Status::OK();
 }
 
+std::shared_ptr<std::mutex> 
BaseTabletsChannel::_get_sender_partition_route_lock(

Review Comment:
   why need this lock?



##########
be/src/common/config.cpp:
##########
@@ -749,6 +749,8 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
+// Whether random bucket load rotates to the next local bucket when memtable 
flushes.
+DEFINE_mBool(enable_adaptive_random_bucket_load_bucket_rotation, "true");

Review Comment:
   Do we need this config.



##########
be/src/load/channel/tablets_channel.cpp:
##########
@@ -702,10 +910,18 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         return Status::OK();
     }
 
+    if (request.is_receiver_side_random_bucket()) {

Review Comment:
   unify adaptive_random_bucket vs receiver_side_random_bucket
   
   The same logical switch is renamed several times across the stack, which 
makes it hard to tell whether these are the same concept or different ones:
   
   Config.enable_adaptive_random_bucket_load   (FE config)
     → TOlapTableSink.enable_adaptive_random_bucket   (thrift)
       → FIND_TABLET_RANDOM_BUCKET / is_adaptive_random_bucket()   (BE finder)
         → PTabletWriter*.is_receiver_side_random_bucket   (proto / receiver)



##########
be/src/exec/sink/vtablet_finder.h:
##########
@@ -19,15 +19,45 @@
 
 #include <cstdint>
 #include <map>
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
 
 #include "common/status.h"
 #include "core/block/block.h"
 #include "exec/common/hash_table/phmap_fwd_decl.h"
 #include "storage/tablet_info.h"
 #include "util/bitmap.h"
+#include "util/uid_util.h"
 
 namespace doris {
 
+class AdaptiveRandomBucketState {

Review Comment:
   AdaptiveRandomBucketState looks misplaced in vtablet_finder.h
   
   AdaptiveRandomBucketState is purely receiver-side state — it's only used by 
BaseTabletsChannel . The sender-side OlapTabletFinder in this same header never 
touches it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to