github-actions[bot] commented on code in PR #62661:
URL: https://github.com/apache/doris/pull/62661#discussion_r3413227400
##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java:
##########
@@ -574,6 +581,302 @@ private TOlapTablePartitionParam
createDummyPartition(long dbId, OlapTable table
return partitionParam;
}
+ public static final class AdaptiveBucketAssignment {
+ private final long bucketBeId;
+ private final int loadTabletIdx;
+ private final List<Integer> localBucketSeqs;
+
+ public AdaptiveBucketAssignment(long bucketBeId, int loadTabletIdx,
List<Integer> localBucketSeqs) {
+ this.bucketBeId = bucketBeId;
+ this.loadTabletIdx = loadTabletIdx;
+ this.localBucketSeqs = new ArrayList<>(localBucketSeqs);
+ }
+
+ public long getBucketBeId() {
+ return bucketBeId;
+ }
+
+ public int getLoadTabletIdx() {
+ return loadTabletIdx;
+ }
+
+ public List<Integer> getLocalBucketSeqs() {
+ return localBucketSeqs;
+ }
+ }
+
+ public static boolean shouldAssignAdaptiveRandomBucket(TOlapTableSink
sink) {
+ return sink != null
+ && sink.isSetEnableAdaptiveRandomBucket()
+ && sink.isEnableAdaptiveRandomBucket()
+ && (!sink.isSetLoadToSingleTablet() ||
!sink.isLoadToSingleTablet())
+ && sink.isSetPartition()
+ && sink.getPartition() != null
+ && (!sink.getPartition().isSetDistributedColumns()
+ ||
sink.getPartition().getDistributedColumns().isEmpty());
+ }
+
+ public boolean shouldAssignAdaptiveRandomBucket() {
+ return tDataSink != null &&
shouldAssignAdaptiveRandomBucket(tDataSink.getOlapTableSink());
+ }
+
+ public static Map<Long, Map<Long, AdaptiveBucketAssignment>>
computeAdaptiveRandomBucketAssignments(
+ List<Long> sinkBackendIds, List<TOlapTablePartition> partitions,
+ List<TTabletLocation> tabletLocations, int planFragmentNum) {
+ Map<Long, Map<Long, AdaptiveBucketAssignment>> assignments = new
HashMap<>();
+ List<Long> orderedSinkBackendIds = sinkBackendIds.stream()
+ .distinct()
+ .sorted()
+ .collect(Collectors.toList());
+ for (Long sinkBackendId : orderedSinkBackendIds) {
+ assignments.put(sinkBackendId, new HashMap<>());
+ }
+ if (orderedSinkBackendIds.isEmpty() || partitions == null ||
tabletLocations == null) {
+ return assignments;
+ }
+
+ Map<Long, TTabletLocation> tabletLocationMap = new
HashMap<>(tabletLocations.size());
+ for (TTabletLocation tabletLocation : tabletLocations) {
+ tabletLocationMap.put(tabletLocation.getTabletId(),
tabletLocation);
+ }
+
+ for (TOlapTablePartition partition : partitions) {
+ if (!partition.isSetLoadTabletIdx() || partition.getNumBuckets()
<= 0
+ || partition.getIndexes().isEmpty()) {
+ continue;
+ }
+ Map<Long, List<Integer>> beToBucketSeqs =
buildBeToBucketSeqs(partition, tabletLocationMap);
+ long baseTabletIndex = partition.getLoadTabletIdx();
+ int fallbackBucketIdx = (int) Math.floorMod(baseTabletIndex,
(long) partition.getNumBuckets());
+ int targetBucketNum = Math.min(
+ Math.min(orderedSinkBackendIds.size(),
partition.getNumBuckets()),
+ Math.max(planFragmentNum, 1));
+ if (targetBucketNum <= 0) {
+ continue;
+ }
+
+ List<Long> rotatedSinkBackendIds =
rotateSinkBackendIds(orderedSinkBackendIds, baseTabletIndex);
+ Map<Integer, Long> bucketToOwnerBe =
buildBucketToOwnerBe(beToBucketSeqs);
+ List<Integer> selectedBucketSeqs =
selectAdaptiveBucketSeqs(rotatedSinkBackendIds,
+ beToBucketSeqs, bucketToOwnerBe, baseTabletIndex,
partition.getNumBuckets(), targetBucketNum);
+ if (selectedBucketSeqs.isEmpty()) {
+ selectedBucketSeqs =
Collections.singletonList(fallbackBucketIdx);
+ }
+ if (selectedBucketSeqs.size() != targetBucketNum) {
+ LOG.warn("Adaptive random bucket selected {} buckets instead
of target {} for partition {}, "
+ + "sinkBackendIds={}, beToBucketSeqs={},
selectedBucketSeqs={}, fallbackBucketIdx={}",
+ selectedBucketSeqs.size(), targetBucketNum,
partition.getId(), orderedSinkBackendIds,
+ beToBucketSeqs, selectedBucketSeqs, fallbackBucketIdx);
+ }
+
+ Map<Long, List<Integer>> openedBeToBucketSeqs =
buildBeToBucketSeqs(bucketToOwnerBe, selectedBucketSeqs);
+ Map<Integer, Integer> bucketUseCounts = new HashMap<>();
+ Map<Long, String> sinkAssignments = LOG.isInfoEnabled() ? new
HashMap<>() : null;
+ for (Long sinkBackendId : rotatedSinkBackendIds) {
+ int bucketSeq = selectLeastUsedBucketSeq(
+ openedBeToBucketSeqs.get(sinkBackendId),
bucketUseCounts, baseTabletIndex);
+ if (bucketSeq < 0) {
+ bucketSeq = selectLeastUsedBucketSeq(selectedBucketSeqs,
bucketUseCounts, baseTabletIndex);
+ }
+ if (bucketSeq < 0) {
+ bucketSeq = fallbackBucketIdx;
+ }
+ long bucketBeId = bucketToOwnerBe.getOrDefault(bucketSeq, -1L);
+ if (bucketBeId <= 0) {
+ LOG.warn("Adaptive random bucket falls back to bucket {}
without owner BE for partition {}, "
+ + "sinkBackendId={},
selectedBucketSeqs={}, beToBucketSeqs={}",
+ bucketSeq, partition.getId(), sinkBackendId,
selectedBucketSeqs, beToBucketSeqs);
+ }
+ List<Integer> localBucketSeqs = rotateBucketSeqsForStartBucket(
+ beToBucketSeqs.get(bucketBeId), bucketSeq);
+ assignments.get(sinkBackendId).put(partition.getId(),
+ new AdaptiveBucketAssignment(bucketBeId, bucketSeq,
localBucketSeqs));
+ bucketUseCounts.merge(bucketSeq, 1, Integer::sum);
+ if (sinkAssignments != null) {
+ sinkAssignments.put(sinkBackendId,
+ "bucket=" + bucketSeq + ",bucketBeId=" + bucketBeId
+ + ",localBucketSeqs=" + localBucketSeqs);
+ }
+ }
+ if (sinkAssignments != null) {
+ LOG.info("Adaptive random bucket plan partition={},
baseTabletIndex={}, targetBucketNum={}, "
+ + "sinkBackendIds={},
rotatedSinkBackendIds={}, beToBucketSeqs={}, "
+ + "selectedBucketSeqs={},
openedBeToBucketSeqs={}, sinkAssignments={}",
+ partition.getId(), baseTabletIndex, targetBucketNum,
orderedSinkBackendIds,
+ rotatedSinkBackendIds, beToBucketSeqs,
selectedBucketSeqs, openedBeToBucketSeqs,
+ sinkAssignments);
+ }
+ }
+ return assignments;
+ }
+
+ public static void
applyAdaptiveRandomBucketAssignments(List<TOlapTablePartition> partitions,
+ Map<Long, AdaptiveBucketAssignment> partitionAssignments) {
+ if (partitions == null || partitionAssignments == null ||
partitionAssignments.isEmpty()) {
+ return;
+ }
+ for (TOlapTablePartition partition : partitions) {
+ AdaptiveBucketAssignment assignment =
partitionAssignments.get(partition.getId());
+ if (assignment == null) {
+ continue;
+ }
+ partition.setLoadTabletIdx(assignment.getLoadTabletIdx());
+ if (assignment.getBucketBeId() > 0) {
+ partition.setBucketBeId(assignment.getBucketBeId());
+ } else if (partition.isSetBucketBeId()) {
+ partition.unsetBucketBeId();
+ }
+ if (!assignment.getLocalBucketSeqs().isEmpty()) {
+ partition.setLocalBucketSeqs(new
ArrayList<>(assignment.getLocalBucketSeqs()));
+ } else if (partition.isSetLocalBucketSeqs()) {
+ partition.unsetLocalBucketSeqs();
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Adaptive random bucket apply partition={},
bucketBeId={}, loadTabletIdx={}, "
+ + "localBucketSeqs={}",
+ partition.getId(), assignment.getBucketBeId(),
assignment.getLoadTabletIdx(),
+ assignment.getLocalBucketSeqs());
+ }
+ }
+ }
+
+ private static Map<Long, List<Integer>>
buildBeToBucketSeqs(TOlapTablePartition partition,
+ Map<Long, TTabletLocation> tabletLocationMap) {
Review Comment:
This derives the adaptive bucket owner from only the first materialized
index, but the assignment is stored on `TOlapTablePartition` and `IndexChannel`
reuses the same `bucket_be_id` / `local_bucket_seqs` for every index. In cloud
a random table can have a rollup/materialized index whose tablet for the same
bucket sequence is placed on a different BE than the base index. Then the
secondary index channel either skips the selected seq as non-local during
`_open_internal()` and fails later with no receiver state, or rotates to a
different local seq than the base index. That can make a load into a random
table with rollups fail or write different indexes to different bucket
sequences. Please make the adaptive assignment per index, or gate it on
verifying that every index has the same owner for each selected bucket sequence.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]