github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3413227400


##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -574,6 +581,302 @@ private TOlapTablePartitionParam 
createDummyPartition(long dbId, OlapTable table
         return partitionParam;
     }
 
+    public static final class AdaptiveBucketAssignment {
+        private final long bucketBeId;
+        private final int loadTabletIdx;
+        private final List<Integer> localBucketSeqs;
+
+        public AdaptiveBucketAssignment(long bucketBeId, int loadTabletIdx, 
List<Integer> localBucketSeqs) {
+            this.bucketBeId = bucketBeId;
+            this.loadTabletIdx = loadTabletIdx;
+            this.localBucketSeqs = new ArrayList<>(localBucketSeqs);
+        }
+
+        public long getBucketBeId() {
+            return bucketBeId;
+        }
+
+        public int getLoadTabletIdx() {
+            return loadTabletIdx;
+        }
+
+        public List<Integer> getLocalBucketSeqs() {
+            return localBucketSeqs;
+        }
+    }
+
+    public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink 
sink) {
+        return sink != null
+                && sink.isSetEnableAdaptiveRandomBucket()
+                && sink.isEnableAdaptiveRandomBucket()
+                && (!sink.isSetLoadToSingleTablet() || 
!sink.isLoadToSingleTablet())
+                && sink.isSetPartition()
+                && sink.getPartition() != null
+                && (!sink.getPartition().isSetDistributedColumns()
+                        || 
sink.getPartition().getDistributedColumns().isEmpty());
+    }
+
+    public boolean shouldAssignAdaptiveRandomBucket() {
+        return tDataSink != null && 
shouldAssignAdaptiveRandomBucket(tDataSink.getOlapTableSink());
+    }
+
+    public static Map<Long, Map<Long, AdaptiveBucketAssignment>> 
computeAdaptiveRandomBucketAssignments(
+            List<Long> sinkBackendIds, List<TOlapTablePartition> partitions,
+            List<TTabletLocation> tabletLocations, int planFragmentNum) {
+        Map<Long, Map<Long, AdaptiveBucketAssignment>> assignments = new 
HashMap<>();
+        List<Long> orderedSinkBackendIds = sinkBackendIds.stream()
+                .distinct()
+                .sorted()
+                .collect(Collectors.toList());
+        for (Long sinkBackendId : orderedSinkBackendIds) {
+            assignments.put(sinkBackendId, new HashMap<>());
+        }
+        if (orderedSinkBackendIds.isEmpty() || partitions == null || 
tabletLocations == null) {
+            return assignments;
+        }
+
+        Map<Long, TTabletLocation> tabletLocationMap = new 
HashMap<>(tabletLocations.size());
+        for (TTabletLocation tabletLocation : tabletLocations) {
+            tabletLocationMap.put(tabletLocation.getTabletId(), 
tabletLocation);
+        }
+
+        for (TOlapTablePartition partition : partitions) {
+            if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets() 
<= 0
+                    || partition.getIndexes().isEmpty()) {
+                continue;
+            }
+            Map<Long, List<Integer>> beToBucketSeqs = 
buildBeToBucketSeqs(partition, tabletLocationMap);
+            long baseTabletIndex = partition.getLoadTabletIdx();
+            int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex, 
(long) partition.getNumBuckets());
+            int targetBucketNum = Math.min(
+                    Math.min(orderedSinkBackendIds.size(), 
partition.getNumBuckets()),
+                    Math.max(planFragmentNum, 1));
+            if (targetBucketNum <= 0) {
+                continue;
+            }
+
+            List<Long> rotatedSinkBackendIds = 
rotateSinkBackendIds(orderedSinkBackendIds, baseTabletIndex);
+            Map<Integer, Long> bucketToOwnerBe = 
buildBucketToOwnerBe(beToBucketSeqs);
+            List<Integer> selectedBucketSeqs = 
selectAdaptiveBucketSeqs(rotatedSinkBackendIds,
+                    beToBucketSeqs, bucketToOwnerBe, baseTabletIndex, 
partition.getNumBuckets(), targetBucketNum);
+            if (selectedBucketSeqs.isEmpty()) {
+                selectedBucketSeqs = 
Collections.singletonList(fallbackBucketIdx);
+            }
+            if (selectedBucketSeqs.size() != targetBucketNum) {
+                LOG.warn("Adaptive random bucket selected {} buckets instead 
of target {} for partition {}, "
+                                + "sinkBackendIds={}, beToBucketSeqs={}, 
selectedBucketSeqs={}, fallbackBucketIdx={}",
+                        selectedBucketSeqs.size(), targetBucketNum, 
partition.getId(), orderedSinkBackendIds,
+                        beToBucketSeqs, selectedBucketSeqs, fallbackBucketIdx);
+            }
+
+            Map<Long, List<Integer>> openedBeToBucketSeqs = 
buildBeToBucketSeqs(bucketToOwnerBe, selectedBucketSeqs);
+            Map<Integer, Integer> bucketUseCounts = new HashMap<>();
+            Map<Long, String> sinkAssignments = LOG.isInfoEnabled() ? new 
HashMap<>() : null;
+            for (Long sinkBackendId : rotatedSinkBackendIds) {
+                int bucketSeq = selectLeastUsedBucketSeq(
+                        openedBeToBucketSeqs.get(sinkBackendId), 
bucketUseCounts, baseTabletIndex);
+                if (bucketSeq < 0) {
+                    bucketSeq = selectLeastUsedBucketSeq(selectedBucketSeqs, 
bucketUseCounts, baseTabletIndex);
+                }
+                if (bucketSeq < 0) {
+                    bucketSeq = fallbackBucketIdx;
+                }
+                long bucketBeId = bucketToOwnerBe.getOrDefault(bucketSeq, -1L);
+                if (bucketBeId <= 0) {
+                    LOG.warn("Adaptive random bucket falls back to bucket {} 
without owner BE for partition {}, "
+                                    + "sinkBackendId={}, 
selectedBucketSeqs={}, beToBucketSeqs={}",
+                            bucketSeq, partition.getId(), sinkBackendId, 
selectedBucketSeqs, beToBucketSeqs);
+                }
+                List<Integer> localBucketSeqs = rotateBucketSeqsForStartBucket(
+                        beToBucketSeqs.get(bucketBeId), bucketSeq);
+                assignments.get(sinkBackendId).put(partition.getId(),
+                        new AdaptiveBucketAssignment(bucketBeId, bucketSeq, 
localBucketSeqs));
+                bucketUseCounts.merge(bucketSeq, 1, Integer::sum);
+                if (sinkAssignments != null) {
+                    sinkAssignments.put(sinkBackendId,
+                            "bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId
+                                    + ",localBucketSeqs=" + localBucketSeqs);
+                }
+            }
+            if (sinkAssignments != null) {
+                LOG.info("Adaptive random bucket plan partition={}, 
baseTabletIndex={}, targetBucketNum={}, "
+                                + "sinkBackendIds={}, 
rotatedSinkBackendIds={}, beToBucketSeqs={}, "
+                                + "selectedBucketSeqs={}, 
openedBeToBucketSeqs={}, sinkAssignments={}",
+                        partition.getId(), baseTabletIndex, targetBucketNum, 
orderedSinkBackendIds,
+                        rotatedSinkBackendIds, beToBucketSeqs, 
selectedBucketSeqs, openedBeToBucketSeqs,
+                        sinkAssignments);
+            }
+        }
+        return assignments;
+    }
+
+    public static void 
applyAdaptiveRandomBucketAssignments(List<TOlapTablePartition> partitions,
+            Map<Long, AdaptiveBucketAssignment> partitionAssignments) {
+        if (partitions == null || partitionAssignments == null || 
partitionAssignments.isEmpty()) {
+            return;
+        }
+        for (TOlapTablePartition partition : partitions) {
+            AdaptiveBucketAssignment assignment = 
partitionAssignments.get(partition.getId());
+            if (assignment == null) {
+                continue;
+            }
+            partition.setLoadTabletIdx(assignment.getLoadTabletIdx());
+            if (assignment.getBucketBeId() > 0) {
+                partition.setBucketBeId(assignment.getBucketBeId());
+            } else if (partition.isSetBucketBeId()) {
+                partition.unsetBucketBeId();
+            }
+            if (!assignment.getLocalBucketSeqs().isEmpty()) {
+                partition.setLocalBucketSeqs(new 
ArrayList<>(assignment.getLocalBucketSeqs()));
+            } else if (partition.isSetLocalBucketSeqs()) {
+                partition.unsetLocalBucketSeqs();
+            }
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Adaptive random bucket apply partition={}, 
bucketBeId={}, loadTabletIdx={}, "
+                                + "localBucketSeqs={}",
+                        partition.getId(), assignment.getBucketBeId(), 
assignment.getLoadTabletIdx(),
+                        assignment.getLocalBucketSeqs());
+            }
+        }
+    }
+
+    private static Map<Long, List<Integer>> 
buildBeToBucketSeqs(TOlapTablePartition partition,
+            Map<Long, TTabletLocation> tabletLocationMap) {

Review Comment:
   This derives the adaptive bucket owner from only the first materialized 
index, but the assignment is stored on `TOlapTablePartition` and `IndexChannel` 
reuses the same `bucket_be_id` / `local_bucket_seqs` for every index. In cloud 
a random table can have a rollup/materialized index whose tablet for the same 
bucket sequence is placed on a different BE than the base index. Then the 
secondary index channel either skips the selected seq as non-local during 
`_open_internal()` and fails later with no receiver state, or rotates to a 
different local seq than the base index. That can make a load into a random 
table with rollups fail or write different indexes to different bucket 
sequences. Please make the adaptive assignment per index, or gate it on 
verifying that every index has the same owner for each selected bucket sequence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to