github-actions[bot] commented on code in PR #65173:
URL: https://github.com/apache/doris/pull/65173#discussion_r3518077082


##########
fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java:
##########
@@ -601,6 +606,11 @@ private void addScanRangeLocations(Partition partition,
                 } else if (replicas.size() > 1) {
                     Collections.shuffle(replicas);
                 }
+                if (affinityDecision == null) {
+                    affinityDecision = 
context.getQueryResourceGroupAffinityDecision();
+                }
+                replicas = 
ResourceGroupAffinityPolicyFactory.get().applyQueryAffinity(affinityDecision, 
replicas,

Review Comment:
   When `skip_missing_version` is enabled, this list has just been sorted by 
`Replica.LAST_SUCCESS_VERSION_COMPARATOR`, and the loop below stops after the 
first surviving backend to recover as much data as possible. Applying query 
affinity here can move a lower-`lastSuccessVersion` local replica ahead of the 
highest-`lastSuccessVersion` replica, so the emergency recovery query can 
return less recoverable data than the session-variable contract promises. 
Please skip affinity in this mode, or only reorder within equal 
`lastSuccessVersion` groups.



##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -295,7 +307,7 @@ private long selectBackendForCloudGroupCommitInternal(long 
tableId, String clust
             throw new LoadException("No alive backend");
         }
         // If the cached backend is not active or decommissioned, select a 
random new backend.
-        Long randomBackendId = getRandomBackend(cluster, tableId, backends);
+        Long randomBackendId = getRandomBackend(cluster, tableId, backends, 
affinityDecision);

Review Comment:
   The effective load-affinity decision only reaches `getRandomBackend()`, but 
this method returns the cached table backend before that call whenever 
`tableToPressureMap` is still below `groupCommitDataBytes`. Because the cache 
key is just cluster/table id, a previous group-commit load can pin table T to a 
BE in group `g_a`, and a later session or forwarded request with 
`preferred_resource_group='g_b'` will keep using the `g_a` BE until pressure or 
availability invalidates the cache. Please either validate/key the cache by the 
effective affinity decision, or explicitly document and test that group commit 
intentionally overrides session load affinity for batching.



##########
fe/fe-core/src/main/java/org/apache/doris/resource/ResourceGroupAffinityPolicy.java:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+/**
+ * SPI for resource-group (tag.location) affinity decisions.
+ * <p>
+ * The public default is a pass-through no-op. Downstream builds may register 
a real implementation
+ * via {@link java.util.ServiceLoader}; {@link 
ResourceGroupAffinityPolicyFactory} then selects it.
+ */
+public interface ResourceGroupAffinityPolicy {
+
+    /** Observability classification of how a repair clone source was 
selected. */
+    enum SrcAffinityResult {
+        DISABLED,
+        LOCAL_HIT,
+        FALLBACK_NO_LOCAL,
+        FALLBACK_LOCAL_UNHEALTHY,
+        FALLBACK_SLOT_FULL
+    }
+
+    /** Whether repair-clone source affinity is active. Open-source no-op: 
always {@code false}. */
+    default boolean isRepairSrcAffinityEnabled() {
+        return false;
+    }
+
+    /**
+     * Reorder healthy clone-source candidates to prefer the same {@code 
tag.location} as the repair
+     * destination backend. Must be a <strong>stable</strong> reorder so the 
caller's existing
+     * version-based ordering is preserved within each tier, and must not drop 
candidates so that a
+     * same-AZ slot-full case still falls through to a cross-AZ source.
+     * <p>
+     * Open-source no-op: returns {@code healthyCandidates} unchanged.
+     */
+    default List<Replica> orderRepairSrcCandidates(List<Replica> 
healthyCandidates, long destBackendId) {
+        return healthyCandidates;
+    }
+
+    /**
+     * Classify which affinity tier the finally chosen source fell into, for 
observability.
+     * Open-source no-op: returns {@link SrcAffinityResult#DISABLED}.
+     */
+    default SrcAffinityResult classifyRepairSrc(long chosenSrcBackendId, long 
destBackendId,
+            List<Replica> allReplicas, List<Replica> healthyCandidates) {
+        return SrcAffinityResult.DISABLED;
+    }
+
+    default ResourceGroupAffinity.AffinityDecision 
decideForQuery(ConnectContext context) {
+        return ResourceGroupAffinity.AffinityDecision.noAffinity();
+    }
+
+    default ResourceGroupAffinity.AffinityDecision decideForQuery(
+            ConnectContext context, Set<Tag> allowedTags, boolean 
needCheckTags) {
+        return ResourceGroupAffinity.AffinityDecision.noAffinity();
+    }
+
+    /**
+     * Optionally reorder query scan candidates before the existing scheduler 
chooses a backend.
+     * This is a placement hint: callers may still apply their normal 
load-balancing policy after
+     * this method. Implementations must not drop candidates.
+     */
+    default <T> List<T> 
applyQueryAffinity(ResourceGroupAffinity.AffinityDecision decision, List<T> 
candidates,
+            Function<T, Tag> beTagOf) throws UserException {
+        return candidates;
+    }
+
+    /**
+     * Optionally reorder only candidates that are otherwise tied by the 
caller's load-balancing
+     * key. Implementations must preserve candidates outside the tied groups.
+     */
+    default <T> List<T> 
applyQueryAffinityWithinTies(ResourceGroupAffinity.AffinityDecision decision,
+            List<T> sortedCandidates, Comparator<T> tieKey, Function<T, Tag> 
beTagOf) throws UserException {
+        return sortedCandidates;
+    }
+
+    default boolean isLoadAffinityEnabled(ConnectContext context) {
+        return false;
+    }
+
+    default List<Backend> orderLoadBackends(ConnectContext context, 
List<Backend> candidates) throws UserException {
+        return candidates;
+    }
+
+    default List<Backend> 
orderLoadBackends(ResourceGroupAffinity.AffinityDecision decision,
+            List<Backend> candidates) throws UserException {
+        return candidates;
+    }
+
+    default Backend chooseFirstAvailableLoadBackend(ConnectContext context, 
List<Backend> candidates,
+            Predicate<Backend> available) throws UserException {
+        for (Backend backend : candidates) {
+            if (available.test(backend)) {
+                return backend;
+            }
+        }
+        return null;
+    }
+
+    default boolean 
hasEffectiveLoadAffinity(ResourceGroupAffinity.AffinityDecision decision) {
+        return false;
+    }
+
+    default Backend chooseLoadBackendWithAffinity(ConnectContext context, 
List<Backend> candidates)
+            throws LoadException {
+        for (Backend backend : candidates) {
+            if (backend.isLoadAvailable()) {
+                return backend;
+            }
+        }
+        return candidates.isEmpty() ? null : candidates.get(0);

Review Comment:
   This fallback makes the no-op policy return an unavailable backend when 
every candidate fails `isLoadAvailable()`. The cloud non-group stream-load and 
MySQL-load paths now pass raw `getBackendsByClusterName(...)` results into this 
helper, and that list is not prefiltered; before this change 
`StreamLoadHandler.selectBackend()` filtered `Backend::isLoadAvailable` and 
threw `NO_BACKEND_LOAD_AVAILABLE` when none remained. Please return `null` here 
(or filter at those cloud callers) so the existing load-unavailable error path 
is preserved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to