This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch tpcds
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpcds by this push:
     new fea246a7225 [Opt] Opt split assignment for external table. (#30246)
fea246a7225 is described below

commit fea246a7225e6ccb486ae6e93a5fd92b903b8575
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jan 23 12:15:37 2024 +0800

    [Opt] Opt split assignment for external table. (#30246)
---
 .../main/java/org/apache/doris/common/Config.java  |  23 +-
 .../apache/doris/common/util/ConsistentHash.java   |  77 ++-
 .../doris/planner/external/ExternalScanNode.java   |   6 +-
 .../planner/external/FederationBackendPolicy.java  | 333 +++++++++-
 .../doris/planner/external/FileQueryScanNode.java  | 133 ++--
 .../apache/doris/planner/external/FileSplit.java   |   7 +
 .../planner/external/IndexedPriorityQueue.java     | 228 +++++++
 .../external/NodeSelectionStrategy.java}           |  15 +-
 .../Split.java => planner/external/Queue.java}     |  21 +-
 .../external/ResettableRandomizedIterator.java     |  63 ++
 .../apache/doris/planner/external/SplitWeight.java | 130 ++++
 .../external/UpdateablePriorityQueue.java}         |  18 +-
 .../src/main/java/org/apache/doris/spi/Split.java  |  23 +
 .../doris/planner/FederationBackendPolicyTest.java | 706 +++++++++++++++++++--
 14 files changed, 1598 insertions(+), 185 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2494465e05e..6216b859c08 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2241,7 +2241,28 @@ public class Config extends ConfigBase {
             "When file cache is enabled, the number of virtual nodes of each 
node in the consistent hash algorithm. "
                     + "The larger the value, the more uniform the distribution 
of the hash algorithm, "
                     + "but it will increase the memory overhead."})
-    public static int virtual_node_number = 2048;
+    public static int virtual_node_number = 256;
+
+    @ConfField(mutable = true, description = {
+            "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。",
+            "Local node soft affinity optimization. Prefer local replication 
node."})
+    public static boolean optimized_local_scheduling = true;
+
+    @ConfField(mutable = true, description = {
+            "随机算法最小的候选数目,会选取相对最空闲的节点。",
+            "The random algorithm has the smallest number of candidates and 
will select the most idle node."})
+    public static int min_random_candidate_num = 2;
+
+    @ConfField(mutable = true, description = {
+            "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。",
+            "The consistent hash algorithm has the smallest number of 
candidates and will select the most idle node."})
+    public static int min_consistent_hash_candidate_num = 2;
+
+    @ConfField(mutable = true, description = {
+            "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。",
+            "The maximum difference in the number of splits between nodes. "
+                    + "If this number is exceeded, the splits will be 
redistributed."})
+    public static int max_split_num_variance = 1;
 
     @ConfField(description = {
             "控制统计信息的自动触发作业执行记录的持久化行数",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
index 4ef6e4168f1..b627c6d0a4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
@@ -17,11 +17,16 @@
 
 package org.apache.doris.common.util;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 
 import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -89,14 +94,76 @@ public class ConsistentHash<K, N> {
         }
     }
 
-    public N getNode(K key) {
-        if (ring.isEmpty()) {
-            return null;
+    public List<N> getNode(K key, int count) {
+        int nodeCount = ring.values().size();
+        if (count > nodeCount) {
+            count = nodeCount;
         }
+
+        Set<N> uniqueNodes = new LinkedHashSet<>();
+
         Hasher hasher = hashFunction.newHasher();
         Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
+
         SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
-        hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
-        return ring.get(hashKey).getNode();
+        // Start reading from tail
+        for (Map.Entry<Long, VirtualNode> entry : tailMap.entrySet()) {
+            uniqueNodes.add(entry.getValue().node);
+            if (uniqueNodes.size() == count) {
+                break;
+            }
+        }
+
+        if (uniqueNodes.size() < count) {
+            // Start reading from the head as we have exhausted tail
+            SortedMap<Long, VirtualNode> headMap = ring.headMap(hashKey);
+            for (Map.Entry<Long, VirtualNode> entry : headMap.entrySet()) {
+                uniqueNodes.add(entry.getValue().node);
+                if (uniqueNodes.size() == count) {
+                    break;
+                }
+            }
+        }
+
+        return ImmutableList.copyOf(uniqueNodes);
     }
+
+    // public List<N> getNode(K key, int distinctNumber) {
+    //     if (ring.isEmpty()) {
+    //         return null;
+    //     }
+    //     List<N> result = new ArrayList<>();
+    //
+    //     Hasher hasher = hashFunction.newHasher();
+    //     Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
+    //     // SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
+    //     // hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : 
ring.firstKey();
+    //     // return ring.get(hashKey).getNode();
+    //
+    //
+    //     // search from `hash` to end.
+    //     collectNodes(ring.tailMap(hashKey), result, distinctNumber);
+    //     if (result.size() < distinctNumber) {
+    //         // search from begin to end.
+    //         // so we walk this ring twice at most.
+    //         collectNodes(ring, result, distinctNumber);
+    //     }
+    //     return result;
+    // }
+    //
+    // private void collectNodes(SortedMap<Long, VirtualNode> map, List<N> 
result, int distinctNumber) {
+    //     Set<N> visited = new HashSet<>(result);
+    //     for (Map.Entry<Long, VirtualNode> entry : map.entrySet()) {
+    //         VirtualNode vnode = entry.getValue();
+    //         if (visited.contains(vnode.node)) {
+    //             continue;
+    //         }
+    //         visited.add(vnode.node);
+    //         result.add(vnode.node);
+    //         if (result.size() == distinctNumber) {
+    //             break;
+    //         }
+    //     }
+    // }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
index a4751b5205f..65681804875 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TScanRangeLocations;
 
@@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode {
     // set to false means this scan node does not need to check column priv.
     protected boolean needCheckColumnPriv;
 
-    protected final FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy();
+    protected final FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy(
+            ConnectContext.get().getSessionVariable().enableFileCache
+                    ? NodeSelectionStrategy.CONSISTENT_HASHING : 
NodeSelectionStrategy.RANDOM);
 
     public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode {
         return scanRangeLocations.size();
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index ade03291c30..19321a57cbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -24,18 +24,23 @@ import org.apache.doris.common.util.ConsistentHash;
 import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.system.SystemInfoService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.Hashing;
@@ -45,12 +50,16 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -59,21 +68,29 @@ public class FederationBackendPolicy {
     private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
     private final List<Backend> backends = Lists.newArrayList();
     private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
-    private final SecureRandom random = new SecureRandom();
-    private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
+
+    public Map<Backend, Long> getAssignedWeightPerBackend() {
+        return assignedWeightPerBackend;
+    }
+
+    private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
+
+    private ConsistentHash<Split, Backend> consistentHash;
 
     private int nextBe = 0;
     private boolean initialized = false;
 
+    private NodeSelectionStrategy nodeSelectionStrategy;
+
     // Create a ConsistentHash ring may be a time-consuming operation, so we 
cache it.
-    private static LoadingCache<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
+    private static LoadingCache<HashCacheKey, ConsistentHash<Split, Backend>> 
consistentHashCache;
 
     static {
         consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
-                .build(new CacheLoader<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>>() {
+                .build(new CacheLoader<HashCacheKey, ConsistentHash<Split, 
Backend>>() {
                     @Override
-                    public ConsistentHash<TScanRangeLocations, Backend> 
load(HashCacheKey key) {
-                        return new ConsistentHash<>(Hashing.murmur3_128(), new 
ScanRangeHash(),
+                    public ConsistentHash<Split, Backend> load(HashCacheKey 
key) {
+                        return new ConsistentHash<>(Hashing.murmur3_128(), new 
SplitHash(),
                                 new BackendHash(), key.bes, 
Config.virtual_node_number);
                     }
                 });
@@ -81,13 +98,16 @@ public class FederationBackendPolicy {
 
     private static class HashCacheKey {
         // sorted backend ids as key
-        private List<Long> beIds;
+        private List<String> beHashKeys;
         // backends is not part of key, just an attachment
         private List<Backend> bes;
 
         HashCacheKey(List<Backend> backends) {
             this.bes = backends;
-            this.beIds = backends.stream().map(b -> 
b.getId()).sorted().collect(Collectors.toList());
+            this.beHashKeys = backends.stream().map(b ->
+                            String.format("id: %d, host: %s, port: %d", 
b.getId(), b.getHost(), b.getHeartbeatPort()))
+                    .sorted()
+                    .collect(Collectors.toList());
         }
 
         @Override
@@ -98,20 +118,28 @@ public class FederationBackendPolicy {
             if (!(obj instanceof HashCacheKey)) {
                 return false;
             }
-            return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
+            return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(beIds);
+            return Objects.hash(beHashKeys);
         }
 
         @Override
         public String toString() {
-            return "HashCache{" + "beIds=" + beIds + '}';
+            return "HashCache{" + "beHashKeys=" + beHashKeys + '}';
         }
     }
 
+    public FederationBackendPolicy(NodeSelectionStrategy 
nodeSelectionStrategy) {
+        this.nodeSelectionStrategy = nodeSelectionStrategy;
+    }
+
+    public FederationBackendPolicy() {
+        this(NodeSelectionStrategy.RANDOM);
+    }
+
     public void init() throws UserException {
         if (!initialized) {
             init(Collections.emptyList());
@@ -152,6 +180,10 @@ public class FederationBackendPolicy {
         if (backends.isEmpty()) {
             throw new UserException("No available backends");
         }
+        for (Backend backend : backends) {
+            assignedWeightPerBackend.put(backend, 0L);
+        }
+
         
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
         try {
             consistentHash = consistentHashCache.get(new 
HashCacheKey(backends));
@@ -166,23 +198,260 @@ public class FederationBackendPolicy {
         return selectedBackend;
     }
 
-    public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) 
{
-        return consistentHash.getNode(scanRangeLocations);
+    public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> 
splits) throws UserException {
+        ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
+
+        List<Split> remainingSplits = null;
+
+        List<Backend> filteredNodes = new ArrayList<>();
+        for (List<Backend> backendList : backendMap.values()) {
+            filteredNodes.addAll(backendList);
+        }
+        ResettableRandomizedIterator<Backend> randomCandidates = new 
ResettableRandomizedIterator<>(filteredNodes);
+
+        boolean splitsToBeRedistributed = false;
+
+        // optimizedLocalScheduling enables prioritized assignment of splits 
to local nodes when splits contain
+        // locality information
+        if (Config.optimized_local_scheduling) {
+            remainingSplits = new ArrayList<>(splits.size());
+            for (int i = 0; i < splits.size(); ++i) {
+                Split split = splits.get(i);
+                if (split.isRemotelyAccessible() && (split.getHosts() != null 
&& split.getHosts().length > 0)) {
+                    List<Backend> candidateNodes = 
selectExactNodes(backendMap, split.getHosts());
+
+                    Optional<Backend> chosenNode = candidateNodes.stream()
+                            .min(Comparator.comparingLong(ownerNode -> 
assignedWeightPerBackend.get(ownerNode)));
+
+                    if (chosenNode.isPresent()) {
+                        Backend selectedBackend = chosenNode.get();
+                        assignment.put(selectedBackend, split);
+                        assignedWeightPerBackend.put(selectedBackend,
+                                assignedWeightPerBackend.get(selectedBackend) 
+ split.getSplitWeight().getRawValue());
+                        splitsToBeRedistributed = true;
+                        continue;
+                    }
+                }
+                remainingSplits.add(split);
+            }
+        } else {
+            remainingSplits = splits;
+        }
+
+        for (Split split : remainingSplits) {
+            List<Backend> candidateNodes;
+            if (!split.isRemotelyAccessible()) {
+                candidateNodes = selectExactNodes(backendMap, 
split.getHosts());
+            } else {
+                switch (nodeSelectionStrategy) {
+                    case RANDOM: {
+                        randomCandidates.reset();
+                        candidateNodes = 
selectNodes(Config.min_random_candidate_num, randomCandidates);
+                        break;
+                    }
+                    case CONSISTENT_HASHING: {
+                        candidateNodes = consistentHash.getNode(split, 
Config.min_consistent_hash_candidate_num);
+                        splitsToBeRedistributed = true;
+                        break;
+                    }
+                    default: {
+                        throw new RuntimeException();
+                    }
+                }
+            }
+            if (candidateNodes.isEmpty()) {
+                LOG.debug("No nodes available to schedule {}. Available nodes 
{}", split,
+                        filteredNodes);
+                throw new 
UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
+            }
+
+            Backend selectedBackend = chooseNodeForSplit(candidateNodes);
+            List<Backend> alternativeBackends = new 
ArrayList<>(candidateNodes);
+            alternativeBackends.remove(selectedBackend);
+            split.setAlternativeHosts(
+                    alternativeBackends.stream().map(each -> 
each.getHost()).collect(Collectors.toList()));
+            assignment.put(selectedBackend, split);
+            assignedWeightPerBackend.put(selectedBackend,
+                    assignedWeightPerBackend.get(selectedBackend) + 
split.getSplitWeight().getRawValue());
+        }
+
+        if (splitsToBeRedistributed) {
+            equateDistribution(assignment);
+        }
+        return assignment;
     }
 
-    // Try to find a local BE, if not exists, use `getNextBe` instead
-    public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations 
scanRangeLocations) {
-        List<Backend> candidateBackends = 
Lists.newArrayListWithCapacity(hosts.size());
+    /**
+     * The method tries to make the distribution of splits more uniform. All 
nodes are arranged into a maxHeap and
+     * a minHeap based on the number of splits that are assigned to them. 
Splits are redistributed, one at a time,
+     * from a maxNode to a minNode until we have as uniform a distribution as 
possible.
+     *
+     * @param assignment the node-splits multimap after the first and the 
second stage
+     */
+    private void equateDistribution(ListMultimap<Backend, Split> assignment) {
+        if (assignment.isEmpty()) {
+            return;
+        }
+
+        List<Backend> allNodes = new ArrayList<>();
+        for (List<Backend> backendList : backendMap.values()) {
+            allNodes.addAll(backendList);
+        }
+        Collections.sort(allNodes, Comparator.comparing(Backend::getId));
+
+        if (allNodes.size() < 2) {
+            return;
+        }
+
+        IndexedPriorityQueue<Backend> maxNodes = new IndexedPriorityQueue<>();
+        for (Backend node : allNodes) {
+            maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node));
+        }
+
+        IndexedPriorityQueue<Backend> minNodes = new IndexedPriorityQueue<>();
+        for (Backend node : allNodes) {
+            minNodes.addOrUpdate(node, Long.MAX_VALUE - 
assignedWeightPerBackend.get(node));
+        }
+
+        while (true) {
+            if (maxNodes.isEmpty()) {
+                return;
+            }
+
+            // fetch min and max node
+            Backend maxNode = maxNodes.poll();
+            Backend minNode = minNodes.poll();
+
+            // Allow some degree of non uniformity when assigning splits to 
nodes. Usually data distribution
+            // among nodes in a cluster won't be fully uniform (e.g. because 
hash function with non-uniform
+            // distribution is used like consistent hashing). In such case it 
makes sense to assign splits to nodes
+            // with data because of potential savings in network throughput 
and CPU time.
+            // The difference of 5 between node with maximum and minimum 
splits is a tradeoff between ratio of
+            // misassigned splits and assignment uniformity. Using larger 
numbers doesn't reduce the number of
+            // misassigned splits greatly (in absolute values).
+            if (assignedWeightPerBackend.get(maxNode) - 
assignedWeightPerBackend.get(minNode)
+                    <= 
SplitWeight.rawValueForStandardSplitCount(Config.max_split_num_variance)) {
+                return;
+            }
+
+            // move split from max to min
+            Split redistributedSplit = redistributeSplit(assignment, maxNode, 
minNode);
+
+            assignedWeightPerBackend.put(maxNode,
+                    assignedWeightPerBackend.get(maxNode) - 
redistributedSplit.getSplitWeight().getRawValue());
+            assignedWeightPerBackend.put(minNode, Math.addExact(
+                    assignedWeightPerBackend.get(minNode), 
redistributedSplit.getSplitWeight().getRawValue()));
+
+            // add max back into maxNodes only if it still has assignments
+            if (assignment.containsKey(maxNode)) {
+                maxNodes.addOrUpdate(maxNode, 
assignedWeightPerBackend.get(maxNode));
+            }
+
+            // Add or update both the Priority Queues with the updated node 
priorities
+            maxNodes.addOrUpdate(minNode, 
assignedWeightPerBackend.get(minNode));
+            minNodes.addOrUpdate(minNode, Long.MAX_VALUE - 
assignedWeightPerBackend.get(minNode));
+            minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - 
assignedWeightPerBackend.get(maxNode));
+        }
+    }
+
+    /**
+     * The method selects and removes a split from the fromNode and assigns it 
to the toNode. There is an attempt to
+     * redistribute a Non-local split if possible. This case is possible when 
there are multiple queries running
+     * simultaneously. If a Non-local split cannot be found in the maxNode, 
next split is selected and reassigned.
+     */
+    @VisibleForTesting
+    public static Split redistributeSplit(Multimap<Backend, Split> assignment, 
Backend fromNode,
+            Backend toNode) {
+        Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
+        Split splitToBeRedistributed = null;
+        while (splitIterator.hasNext()) {
+            Split split = splitIterator.next();
+            // Try to select non-local split for redistribution
+            if (split.getHosts() != null && !isSplitLocal(
+                    split.getHosts(), fromNode.getHost())) {
+                splitToBeRedistributed = split;
+                break;
+            }
+        }
+        // Select split if maxNode has no non-local splits in the current 
batch of assignment
+        if (splitToBeRedistributed == null) {
+            splitIterator = assignment.get(fromNode).iterator();
+            while (splitIterator.hasNext()) {
+                splitToBeRedistributed = splitIterator.next();
+                break;
+                // // if toNode has split replication, transfer this split 
firstly
+                // if (splitToBeRedistributed.getHosts() != null && 
isSplitLocal(
+                //         splitToBeRedistributed.getHosts(), 
toNode.getHost())) {
+                //     break;
+                // }
+                // // if toNode has split replication, transfer this split 
firstly
+                // if (splitToBeRedistributed.getAlternativeHosts() != null && 
isSplitLocal(
+                //         splitToBeRedistributed.getAlternativeHosts(), 
toNode.getHost())) {
+                //     break;
+                // }
+            }
+        }
+        splitIterator.remove();
+        assignment.put(toNode, splitToBeRedistributed);
+        return splitToBeRedistributed;
+    }
+
+    private static boolean isSplitLocal(String[] splitHosts, String host) {
+
+        for (String splitHost : splitHosts) {
+            if (splitHost.equals(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean isSplitLocal(List<String> splitHosts, String host) {
+
+        for (String splitHost : splitHosts) {
+            if (splitHost.equals(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static List<Backend> selectExactNodes(Map<String, List<Backend>> 
backendMap, String[] hosts) {
+        Set<Backend> chosen = new LinkedHashSet<>();
+
         for (String host : hosts) {
-            List<Backend> backends = backendMap.get(host);
-            if (CollectionUtils.isNotEmpty(backends)) {
-                
candidateBackends.add(backends.get(random.nextInt(backends.size())));
+            if (backendMap.containsKey(host)) {
+                backendMap.get(host).stream()
+                        .forEach(chosen::add);
             }
         }
+        return ImmutableList.copyOf(chosen);
+    }
+
+    public static List<Backend> selectNodes(int limit, Iterator<Backend> 
candidates) {
+        Preconditions.checkArgument(limit > 0, "limit must be at least 1");
 
-        return CollectionUtils.isEmpty(candidateBackends)
-                    ? getNextConsistentBe(scanRangeLocations)
-                    : 
candidateBackends.get(random.nextInt(candidateBackends.size()));
+        List<Backend> selected = new ArrayList<>(limit);
+        while (selected.size() < limit && candidates.hasNext()) {
+            selected.add(candidates.next());
+        }
+
+        return selected;
+    }
+
+    private Backend chooseNodeForSplit(List<Backend> candidateNodes) {
+        Backend chosenNode = null;
+        long minWeight = Long.MAX_VALUE;
+
+        for (Backend node : candidateNodes) {
+            long queuedWeight = assignedWeightPerBackend.get(node);
+            if (queuedWeight <= minWeight) {
+                chosenNode = node;
+                minWeight = queuedWeight;
+            }
+        }
+
+        return chosenNode;
     }
 
     public int numBackends() {
@@ -200,15 +469,13 @@ public class FederationBackendPolicy {
         }
     }
 
-    private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
+    private static class SplitHash implements Funnel<Split> {
         @Override
-        public void funnel(TScanRangeLocations scanRange, PrimitiveSink 
primitiveSink) {
-            Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
-            for (TFileRangeDesc desc : 
scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
-                
primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
-                primitiveSink.putLong(desc.start_offset);
-                primitiveSink.putLong(desc.size);
-            }
+        public void funnel(Split split, PrimitiveSink primitiveSink) {
+            
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
+            primitiveSink.putLong(split.getStart());
+            primitiveSink.putLong(split.getLength());
         }
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 71c731498a6..09a8a116f5a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -68,13 +68,14 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -314,76 +315,77 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             params.setProperties(locationProperties);
         }
 
-        boolean enableShortCircuitRead = 
HdfsResource.enableShortCircuitRead(locationProperties);
         List<String> pathPartitionKeys = getPathPartitionKeys();
-        for (Split split : inputSplits) {
-            FileSplit fileSplit = (FileSplit) split;
-            TFileType locationType;
-            if (fileSplit instanceof IcebergSplit
-                    && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
-                locationType = TFileType.FILE_BROKER;
-            } else {
-                locationType = getLocationType(fileSplit.getPath().toString());
-            }
 
-            TScanRangeLocations curLocations = newLocations();
-            // If fileSplit has partition values, use the values collected 
from hive partitions.
-            // Otherwise, use the values in file path.
-            boolean isACID = false;
-            if (fileSplit instanceof HiveSplit) {
-                HiveSplit hiveSplit = (HiveSplit) split;
-                isACID = hiveSplit.isACID();
-            }
-            List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
-                    ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys, false, isACID)
-                    : fileSplit.getPartitionValues();
-
-            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
-                    locationType);
-            TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
-            rangeDesc.setCompressType(fileCompressType);
-            if (isACID) {
-                HiveSplit hiveSplit = (HiveSplit) split;
-                
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
-                TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
-                
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
-                AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
-                TTransactionalHiveDesc transactionalHiveDesc = new 
TTransactionalHiveDesc();
-                
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
-                List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new 
ArrayList<>();
-                for (DeleteDeltaInfo deleteDeltaInfo : 
acidInfo.getDeleteDeltas()) {
-                    TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new 
TTransactionalHiveDeleteDeltaDesc();
-                    
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
-                    
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
-                    deleteDeltaDescs.add(deleteDeltaDesc);
+        Multimap<Backend, Split> assignment =  
backendPolicy.computeScanRangeAssignment(inputSplits);
+        // if (ConnectContext.get().getExecutor() != null) {
+        //     
ConnectContext.get().getExecutor().getSummaryProfile().setComputeAssignmentTime();
+        //     
ConnectContext.get().getExecutor().getSummaryProfile().setComputeAssignment(assignment);
+        // }
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> splits = assignment.get(backend);
+            for (Split split : splits) {
+                FileSplit fileSplit = (FileSplit) split;
+                TFileType locationType;
+                if (fileSplit instanceof IcebergSplit
+                        && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+                    locationType = TFileType.FILE_BROKER;
+                } else {
+                    locationType = 
getLocationType(fileSplit.getPath().toString());
+                }
+
+                TScanRangeLocations curLocations = newLocations();
+                // If fileSplit has partition values, use the values collected 
from hive partitions.
+                // Otherwise, use the values in file path.
+                boolean isACID = false;
+                if (fileSplit instanceof HiveSplit) {
+                    HiveSplit hiveSplit = (HiveSplit) fileSplit;
+                    isACID = hiveSplit.isACID();
+                }
+                List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
+                        ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys,
+                        false, isACID) : fileSplit.getPartitionValues();
+
+                TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
+                        locationType);
+                TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
+                rangeDesc.setCompressType(fileCompressType);
+                if (isACID) {
+                    HiveSplit hiveSplit = (HiveSplit) fileSplit;
+                    
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+                    TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
+                    
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+                    AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+                    TTransactionalHiveDesc transactionalHiveDesc = new 
TTransactionalHiveDesc();
+                    
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+                    List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = 
new ArrayList<>();
+                    for (DeleteDeltaInfo deleteDeltaInfo : 
acidInfo.getDeleteDeltas()) {
+                        TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = 
new TTransactionalHiveDeleteDeltaDesc();
+                        
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+                        
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+                        deleteDeltaDescs.add(deleteDeltaDesc);
+                    }
+                    transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+                    
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+                    rangeDesc.setTableFormatParams(tableFormatFileDesc);
                 }
-                transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
-                
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
-                rangeDesc.setTableFormatParams(tableFormatFileDesc);
-            }
 
-            setScanParams(rangeDesc, fileSplit);
-
-            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-            TScanRangeLocation location = new TScanRangeLocation();
-            Backend selectedBackend;
-            if (enableShortCircuitRead) {
-                // Try to find a local BE if enable hdfs short circuit read
-                selectedBackend = 
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
-            } else {
-                // Use consistent hash to assign the same scan range into the 
same backend among different queries
-                selectedBackend = 
backendPolicy.getNextConsistentBe(curLocations);
+                setScanParams(rangeDesc, fileSplit);
+
+                
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+                TScanRangeLocation location = new TScanRangeLocation();
+                setLocationPropertiesIfNecessary(backend, locationType, 
locationProperties);
+                location.setBackendId(backend.getId());
+                location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
+                curLocations.addToLocations(location);
+                LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
+                        curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
+                        fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
+                scanRangeLocations.add(curLocations);
+                this.totalFileSize += fileSplit.getLength();
             }
-            setLocationPropertiesIfNecessary(selectedBackend, locationType, 
locationProperties);
-            location.setBackendId(selectedBackend.getId());
-            location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
-            curLocations.addToLocations(location);
-            LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
-                    curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
-                    fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
-            scanRangeLocations.add(curLocations);
-            this.totalFileSize += fileSplit.getLength();
         }
+
         if (ConnectContext.get().getExecutor() != null) {
             
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
         }
@@ -506,3 +508,4 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
     protected abstract Map<String, String> getLocationProperties() throws 
UserException;
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 4e9d0bae564..1221d3ad21b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -42,6 +42,8 @@ public class FileSplit implements Split {
     // partitionValues would be ["part1", "part2"]
     protected List<String> partitionValues;
 
+    protected List<String> alternativeHosts;
+
     public FileSplit(Path path, long start, long length, long fileLength,
             long modificationTime, String[] hosts, List<String> 
partitionValues) {
         this.path = path;
@@ -67,6 +69,11 @@ public class FileSplit implements Split {
         return null;
     }
 
+    @Override
+    public String getPathString() {
+        return path.toString();
+    }
+
     public static class FileSplitCreator implements SplitCreator {
 
         public static final FileSplitCreator DEFAULT = new FileSplitCreator();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
new file mode 100644
index 00000000000..451279f1a99
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * A priority queue with constant time contains(E) and log time remove(E)
+ * Ties are broken by insertion order
+ */
+public final class IndexedPriorityQueue<E>
+        implements UpdateablePriorityQueue<E> {
+    private final Map<E, Entry<E>> index = new HashMap<>();
+    private final Set<Entry<E>> queue;
+    private long generation;
+
+    public IndexedPriorityQueue() {
+        this(PriorityOrdering.HIGH_TO_LOW);
+    }
+
+    public IndexedPriorityQueue(PriorityOrdering priorityOrdering) {
+        switch (priorityOrdering) {
+            case LOW_TO_HIGH:
+                queue = new TreeSet<>(
+                        Comparator.comparingLong((Entry<E> entry) -> 
entry.getPriority())
+                                .thenComparingLong(Entry::getGeneration));
+                break;
+            case HIGH_TO_LOW:
+                queue = new TreeSet<>((entry1, entry2) -> {
+                    int priorityComparison = 
Long.compare(entry2.getPriority(), entry1.getPriority());
+                    if (priorityComparison != 0) {
+                        return priorityComparison;
+                    }
+                    return Long.compare(entry1.getGeneration(), 
entry2.getGeneration());
+                });
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    @Override
+    public boolean addOrUpdate(E element, long priority) {
+        Entry<E> entry = index.get(element);
+        if (entry != null) {
+            if (entry.getPriority() == priority) {
+                return false;
+            }
+            queue.remove(entry);
+            Entry<E> newEntry = new Entry<>(element, priority, 
entry.getGeneration());
+            queue.add(newEntry);
+            index.put(element, newEntry);
+            return false;
+        }
+        Entry<E> newEntry = new Entry<>(element, priority, generation);
+        generation++;
+        queue.add(newEntry);
+        index.put(element, newEntry);
+        return true;
+    }
+
+    @Override
+    public boolean contains(E element) {
+        return index.containsKey(element);
+    }
+
+    @Override
+    public boolean remove(E element) {
+        Entry<E> entry = index.remove(element);
+        if (entry != null) {
+            queue.remove(entry);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public E poll() {
+        Entry<E> entry = pollEntry();
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public E peek() {
+        Entry<E> entry = peekEntry();
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public int size() {
+        return queue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return queue.isEmpty();
+    }
+
+    public Prioritized<E> getPrioritized(E element) {
+        Entry<E> entry = index.get(element);
+        if (entry == null) {
+            return null;
+        }
+
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    public Prioritized<E> pollPrioritized() {
+        Entry<E> entry = pollEntry();
+        if (entry == null) {
+            return null;
+        }
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    private Entry<E> pollEntry() {
+        Iterator<Entry<E>> iterator = queue.iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        Entry<E> entry = iterator.next();
+        iterator.remove();
+        Preconditions.checkState(index.remove(entry.getValue()) != null, 
"Failed to remove entry from index");
+        return entry;
+    }
+
+    public Prioritized<E> peekPrioritized() {
+        Entry<E> entry = peekEntry();
+        if (entry == null) {
+            return null;
+        }
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    public Entry<E> peekEntry() {
+        Iterator<Entry<E>> iterator = queue.iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        return iterator.next();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return Iterators.transform(queue.iterator(), Entry::getValue);
+    }
+
+    public enum PriorityOrdering {
+        LOW_TO_HIGH,
+        HIGH_TO_LOW
+    }
+
+    private static final class Entry<E> {
+        private final E value;
+        private final long priority;
+        private final long generation;
+
+        private Entry(E value, long priority, long generation) {
+            this.value = Objects.requireNonNull(value, "value is null");
+            this.priority = priority;
+            this.generation = generation;
+        }
+
+        public E getValue() {
+            return value;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+
+        public long getGeneration() {
+            return generation;
+        }
+    }
+
+    public static class Prioritized<V> {
+        private final V value;
+        private final long priority;
+
+        public Prioritized(V value, long priority) {
+            this.value = Objects.requireNonNull(value, "value is null");
+            this.priority = priority;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
similarity index 83%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
index 31b1e1515a5..616dd5a9be0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
@@ -15,15 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
-    String[] getHosts();
-
-    Object getInfo();
+package org.apache.doris.planner.external;
 
+public enum NodeSelectionStrategy {
+    RANDOM,
+    CONSISTENT_HASHING
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
similarity index 68%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
index 31b1e1515a5..6e0597c821f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
@@ -14,16 +14,23 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java
+// and modified by Doris
 
-package org.apache.doris.spi;
+package org.apache.doris.planner.external;
 
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
+interface Queue<E> {
+    boolean contains(E element);
 
-    String[] getHosts();
+    boolean remove(E element);
 
-    Object getInfo();
+    E poll();
 
+    E peek();
+
+    int size();
+
+    boolean isEmpty();
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
new file mode 100644
index 00000000000..feb8b6ae836
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ResettableRandomizedIterator<T>
+        implements Iterator<T> {
+    private final List<T> list;
+    private int position;
+
+    public ResettableRandomizedIterator(Collection<T> elements) {
+        this.list = new ArrayList<>(elements);
+    }
+
+    public void reset() {
+        position = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return position < list.size();
+    }
+
+    @Override
+    public T next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        int position = ThreadLocalRandom.current().nextInt(this.position, 
list.size());
+
+        T result = list.set(position, list.get(this.position));
+        list.set(this.position, result);
+        this.position++;
+
+        return result;
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
new file mode 100644
index 00000000000..d8724017b67
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.errorprone.annotations.DoNotCall;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.function.Function;
+
+public final class SplitWeight {
+    private static final long UNIT_VALUE = 100;
+    private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ 
UNIT_SCALE) == UNIT_VALUE
+    private static final SplitWeight STANDARD_WEIGHT = new 
SplitWeight(UNIT_VALUE);
+
+    private final long value;
+
+    private SplitWeight(long value) {
+        if (value <= 0) {
+            throw new IllegalArgumentException("value must be > 0, found: " + 
value);
+        }
+        this.value = value;
+    }
+
+    /**
+     * Produces a {@link SplitWeight} from the raw internal value 
representation. This method is intended
+     * primarily for JSON deserialization, and connectors should use not call 
this factory method directly
+     * to construct {@link SplitWeight} instances. Instead, connectors should 
use
+     * {@link SplitWeight#fromProportion(double)}
+     * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} 
changes in the future.
+     */
+    @JsonCreator
+    @DoNotCall // For JSON serialization only
+    public static SplitWeight fromRawValue(long value) {
+        return fromRawValueInternal(value);
+    }
+
+    /**
+     * Produces a {@link SplitWeight} that corresponds to the {@link 
SplitWeight#standard()} weight
+     * proportionally, i.e., a parameter of <code>1.0</code> will be 
equivalent to the standard weight
+     * and a value of <code>0.5</code> will be 1/2 of the standard split 
weight. Valid arguments
+     * must be greater than zero and finite. Connectors should prefer 
constructing split weights
+     * using this factory method rather than passing a raw integer value in 
case the integer representation
+     * of a standard split needs to change in the future.
+     *
+     * @param weight the proportional weight relative to a standard split, 
expressed as a double
+     * @return a {@link SplitWeight} with a raw value corresponding to the 
requested proportion
+     */
+    public static SplitWeight fromProportion(double weight) {
+        if (weight <= 0 || !Double.isFinite(weight)) {
+            throw new IllegalArgumentException("Invalid weight: " + weight);
+        }
+        // Must round up to avoid small weights rounding to 0
+        return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE));
+    }
+
+    private static SplitWeight fromRawValueInternal(long value) {
+        return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value);
+    }
+
+    public static SplitWeight standard() {
+        return STANDARD_WEIGHT;
+    }
+
+    public static long rawValueForStandardSplitCount(int splitCount) {
+        if (splitCount < 0) {
+            throw new IllegalArgumentException("splitCount must be >= 0, 
found: " + splitCount);
+        }
+        return Math.multiplyExact(splitCount, UNIT_VALUE);
+    }
+
+    public static <T> long rawValueSum(Collection<T> collection, Function<T, 
SplitWeight> getter) {
+        long sum = 0;
+        for (T item : collection) {
+            long value = getter.apply(item).getRawValue();
+            sum = Math.addExact(sum, value);
+        }
+        return sum;
+    }
+
+    /**
+     * @return The internal integer representation for this weight value
+     */
+    @JsonValue
+    public long getRawValue() {
+        return value;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof SplitWeight)) {
+            return false;
+        }
+        return this.value == ((SplitWeight) other).value;
+    }
+
+    @Override
+    public String toString() {
+        if (value == UNIT_VALUE) {
+            return "1";
+        }
+        return BigDecimal.valueOf(value, 
-UNIT_SCALE).stripTrailingZeros().toPlainString();
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
similarity index 69%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
index 31b1e1515a5..97441cbd878 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
@@ -14,16 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// and modified by Doris
 
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
-    String[] getHosts();
-
-    Object getInfo();
+package org.apache.doris.planner.external;
 
+interface UpdateablePriorityQueue<E>
+        extends Queue<E>, Iterable<E> {
+    boolean addOrUpdate(E element, long priority);
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 31b1e1515a5..d42defe5c38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.spi;
 
+import org.apache.doris.planner.external.SplitWeight;
+
+import java.util.List;
+
 /**
  * Split interface. e.g. Tablet for Olap Table.
  */
@@ -26,4 +30,23 @@ public interface Split {
 
     Object getInfo();
 
+    default SplitWeight getSplitWeight() {
+        return SplitWeight.standard();
+    }
+
+    default boolean isRemotelyAccessible() {
+        return true;
+    }
+
+    String getPathString();
+
+    long getStart();
+
+    long getLength();
+
+    List<String> getAlternativeHosts();
+
+    void setAlternativeHosts(List<String> alternativeHosts);
+
 }
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index ef65d1b6165..a887b3e590a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -20,6 +20,10 @@ package org.apache.doris.planner;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.NodeSelectionStrategy;
+import org.apache.doris.planner.external.SplitWeight;
+import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TExternalScanRange;
@@ -28,42 +32,49 @@ import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
 import mockit.Mock;
 import mockit.MockUp;
-import mockit.Mocked;
-import org.junit.Before;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 
 public class FederationBackendPolicyTest {
-    @Mocked
-    private Env env;
+    private static <K, V> boolean areMultimapsEqualIgnoringOrder(
+            Multimap<K, V> multimap1, Multimap<K, V> multimap2) {
+        Collection<Map.Entry<K, V>> entries1 = multimap1.entries();
+        Collection<Map.Entry<K, V>> entries2 = multimap2.entries();
 
-    @Before
-    public void setUp() {
+        return entries1.containsAll(entries2) && 
entries2.containsAll(entries1);
+    }
 
+    @Test
+    public void testComputeScanRangeAssignmentRemote() throws UserException {
         SystemInfoService service = new SystemInfoService();
 
-        for (int i = 0; i < 190; i++) {
-            Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 
9050);
-            backend.setAlive(true);
-            service.addBackend(backend);
-        }
-        for (int i = 0; i < 10; i++) {
-            Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." 
+ i, 9051);
-            backend.setAlive(true);
-            service.addBackend(backend);
-        }
-        for (int i = 0; i < 10; i++) {
-            Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." 
+ i, 9050);
-            backend.setAlive(false);
-            service.addBackend(backend);
-        }
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
 
         new MockUp<Env>() {
             @Mock
@@ -72,56 +83,644 @@ public class FederationBackendPolicyTest {
             }
         };
 
-    }
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 103711014, 103711014, 0, null, Collections.emptyList()));
 
-    @Test
-    public void testGetNextBe() throws UserException {
         FederationBackendPolicy policy = new FederationBackendPolicy();
         policy.init();
-        int backendNum = 200;
-        int invokeTimes = 1000000;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
-        Stopwatch sw = Stopwatch.createStarted();
-        for (int i = 0; i < invokeTimes; i++) {
-            
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
+
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            long scanBytes = 0L;
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                scanBytes += fileSplit.getLength();
+            }
+            System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+        }
+        // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+        // int variance = 5 * scanRangeSize;
+        Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+        for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+            System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+            // Assert.assertTrue(Math.abs(entry.getValue() - avg) < variance);
         }
-        sw.stop();
-        System.out.println("Invoke getNextBe() " + invokeTimes
-                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
+
     }
 
     @Test
-    public void testGetNextLocalBe() throws UserException {
-        FederationBackendPolicy policy = new FederationBackendPolicy();
+    public void testComputeScanRangeAssignmentConsistentHash() throws 
UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
+        FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
         policy.init();
-        int backendNum = 200;
-        int invokeTimes = 1000000;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
-        List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", 
"192.168.1.2");
-        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
-        Stopwatch sw = Stopwatch.createStarted();
-        for (int i = 0; i < invokeTimes; i++) {
-            
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, 
scanRangeLocations).getHost()));
+
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            long scanBytes = 0L;
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                scanBytes += fileSplit.getLength();
+            }
+            System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
         }
-        sw.stop();
-        System.out.println("Invoke getNextLocalBe() " + invokeTimes
-                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
+        // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+        // int variance = 5 * scanRangeSize;
+        Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+        for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+            System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+            // Assert.assertTrue(Math.abs(entry.getValue() - avg) < variance);
+        }
+
     }
 
     @Test
-    public void testConsistentHash() throws UserException {
+    public void testComputeScanRangeAssignmentLocal() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, 
Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, 
Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
         FederationBackendPolicy policy = new FederationBackendPolicy();
         policy.init();
-        int backendNum = 200;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
+        int totalSplitNum = 0;
+        List<Boolean> checkedLocalSplit = new ArrayList<>();
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                ++totalSplitNum;
+                if (fileSplit.getPath().equals(new Path(
+                        
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
 {
+                    Assert.assertEquals("172.30.0.100", backend.getHost());
+                    checkedLocalSplit.add(true);
+                } else if (fileSplit.getPath().equals(new Path(
+                        
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
 {
+                    Assert.assertEquals("172.30.0.106", backend.getHost());
+                    checkedLocalSplit.add(true);
+                }
+            }
+        }
+        Assert.assertEquals(2, checkedLocalSplit.size());
+        Assert.assertEquals(9, totalSplitNum);
+        // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+        // int variance = 5 * scanRangeSize;
+        // Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+        // for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+        //     System.out.printf("%s -> %d bytes\n", entry.getKey(), 
entry.getValue());
+        //     // Assert.assertTrue(Math.abs(entry.getValue() - avg) < 
variance);
+        // }
+
+    }
+
+    @Test
+    public void testComputeScanRangeAssigmentRandom() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        Random random = new Random();
+        int backendNum = random.nextInt(100 - 1) + 1;
+
+        int minOctet3 = 0;
+        int maxOctet3 = 250;
+        int minOctet4 = 1;
+        int maxOctet4 = 250;
+        Set<Integer> backendIds = new HashSet<>();
+        Set<String> ipAddresses = new HashSet<>();
+        for (int i = 0; i < backendNum; i++) {
+            String ipAddress;
+            do {
+                int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + 
minOctet3;
+                int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + 
minOctet4;
+                ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+            } while (!ipAddresses.add(ipAddress));
+
+            int backendId;
+            do {
+                backendId = random.nextInt(90000) + 10000;
+            } while (!backendIds.add(backendId));
+
+            Backend backend = new Backend(backendId, ipAddress, 9050);
+            backend.setAlive(true);
+            service.addBackend(backend);
+        }
+
+        List<TScanRangeLocations> tScanRangeLocationsList = new ArrayList<>();
+        List<Split> remoteSplits = new ArrayList<>();
+        int splitCount = random.nextInt(1000 - 100) + 100;
+        for (int i = 0; i < splitCount; ++i) {
+            long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, splitLength, splitLength, 0, null, 
Collections.emptyList());
+            remoteSplits.add(split);
+            tScanRangeLocationsList.add(
+                    getScanRangeLocations(split.getPath().toString(), 
split.getStart(), split.getLength()));
+        }
+
+        List<TScanRangeLocations> localScanRangeLocationsList = new 
ArrayList<>();
+        List<Split> localSplits = new ArrayList<>();
+        int localSplitCount = random.nextInt(1000 - 100) + 100;
+        Set<String> totalLocalHosts = new HashSet<>();
+        for (int i = 0; i < localSplitCount; ++i) {
+            int localHostNum = random.nextInt(3 - 1) + 1;
+            Set<String> localHosts = new HashSet<>();
+            String localHost;
+            for (int j = 0; j < localHostNum; ++j) {
+                do {
+                    localHost = 
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+                } while (!localHosts.add(localHost));
+                totalLocalHosts.add(localHost);
+            }
+            long localSplitLength = random.nextInt(115343360 - 94371840) + 
94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, localSplitLength, localSplitLength, 0, 
localHosts.toArray(new String[0]),
+                    Collections.emptyList());
+            localSplits.add(split);
+            localScanRangeLocationsList.add(
+                    getScanRangeLocations(split.getPath().toString(), 
split.getStart(), split.getLength()));
+        }
+
+        ListMultimap<Backend, Split> result = null;
+        for (int i = 0; i < 3; ++i) {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            List<TScanRangeLocations> totalScanRangeLocationsList = new 
ArrayList<>();
+            totalScanRangeLocationsList.addAll(tScanRangeLocationsList);
+            totalScanRangeLocationsList.addAll(localScanRangeLocationsList);
+            // Collections.shuffle(totalScanRangeLocationsList);
+            policy.init();
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            int totalSplitNum = 0;
+            int minSplitNumPerNode = Integer.MAX_VALUE;
 
-        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
-        Assertions.assertEquals(39, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+            List<Split> totalSplits = new ArrayList<>();
+            totalSplits.addAll(remoteSplits);
+            totalSplits.addAll(localSplits);
+            // Collections.shuffle(totalSplits);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
+            if (i == 0) {
+                result = ArrayListMultimap.create(assignment);
+            } else {
+                Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result, 
assignment));
 
-        scanRangeLocations = getScanRangeLocations("path2", 0, 100);
-        Assertions.assertEquals(78, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+            }
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> splits = assignment.get(backend);
+                if (splits.size() < minSplitNumPerNode) {
+                    minSplitNumPerNode = splits.size();
+                }
+
+                long scanBytes = 0L;
+                for (Split split : splits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    ++totalSplitNum;
+                    if (fileSplit.getHosts() != null && 
fileSplit.getHosts().length > 0) {
+                        for (String host : fileSplit.getHosts()) {
+                            Assert.assertTrue(totalLocalHosts.contains(host));
+                        }
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
splits.size(), scanBytes);
+            }
+            Assert.assertEquals(totalSplits.size(), totalSplitNum);
+            // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+            // int variance = 5 * scanRangeSize;
+            SplitWeight.rawValueForStandardSplitCount(1);
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            long splitNumVariance = 1;
+            long splitWeightVariance = 5 * 
SplitWeight.rawValueForStandardSplitCount(1);
+            Long minSplitWeight = stats.values()
+                    .stream()
+                    .min(Long::compare).orElse(0L);
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+                Assert.assertTrue(Math.abs(entry.getValue() - minSplitWeight) 
<= splitWeightVariance);
+            }
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> splits = assignment.get(backend);
+                Assert.assertTrue(Math.abs(splits.size() - minSplitNumPerNode) 
<= splitNumVariance);
+            }
+        }
     }
 
+    @Test
+    public void testComputeScanRangeAssigmentNonAlive() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        Random random = new Random();
+        int backendNum = random.nextInt(100 - 1) + 1;
+
+        int minOctet3 = 0;
+        int maxOctet3 = 250;
+        int minOctet4 = 1;
+        int maxOctet4 = 250;
+        Set<Integer> backendIds = new HashSet<>();
+        Set<String> ipAddresses = new HashSet<>();
+        int aliveBackendNum = 0;
+        for (int i = 0; i < backendNum; i++) {
+            String ipAddress;
+            do {
+                int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + 
minOctet3;
+                int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + 
minOctet4;
+                ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+            } while (!ipAddresses.add(ipAddress));
+
+            int backendId;
+            do {
+                backendId = random.nextInt(90000) + 10000;
+            } while (!backendIds.add(backendId));
+
+            Backend backend = new Backend(backendId, ipAddress, 9050);
+            if (i % 2 == 0) {
+                ++aliveBackendNum;
+                backend.setAlive(true);
+            } else {
+                backend.setAlive(false);
+            }
+            service.addBackend(backend);
+        }
+
+        List<TScanRangeLocations> tScanRangeLocationsList = new ArrayList<>();
+        List<Split> remoteSplits = new ArrayList<>();
+        int splitCount = random.nextInt(1000 - 100) + 100;
+        for (int i = 0; i < splitCount; ++i) {
+            long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, splitLength, splitLength, 0, null, 
Collections.emptyList());
+            remoteSplits.add(split);
+            tScanRangeLocationsList.add(
+                    getScanRangeLocations(split.getPath().toString(), 
split.getStart(), split.getLength()));
+        }
+
+        List<TScanRangeLocations> localScanRangeLocationsList = new 
ArrayList<>();
+        List<Split> localSplits = new ArrayList<>();
+        int localSplitCount = random.nextInt(1000 - 100) + 100;
+        Set<String> totalLocalHosts = new HashSet<>();
+        for (int i = 0; i < localSplitCount; ++i) {
+            int localHostNum = random.nextInt(3 - 1) + 1;
+            Set<String> localHosts = new HashSet<>();
+            String localHost;
+            for (int j = 0; j < localHostNum; ++j) {
+                do {
+                    localHost = 
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+                } while (!localHosts.add(localHost));
+                totalLocalHosts.add(localHost);
+            }
+            long localSplitLength = random.nextInt(115343360 - 94371840) + 
94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, localSplitLength, localSplitLength, 0, 
localHosts.toArray(new String[0]),
+                    Collections.emptyList());
+            localSplits.add(split);
+            localScanRangeLocationsList.add(
+                    getScanRangeLocations(split.getPath().toString(), 
split.getStart(), split.getLength()));
+        }
+
+        Multimap<Backend, Split> result = null;
+        for (int i = 0; i < 3; ++i) {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            List<TScanRangeLocations> totalScanRangeLocationsList = new 
ArrayList<>();
+            totalScanRangeLocationsList.addAll(tScanRangeLocationsList);
+            totalScanRangeLocationsList.addAll(localScanRangeLocationsList);
+            // Collections.shuffle(totalScanRangeLocationsList);
+            policy.init();
+            // policy.setScanRangeLocationsList(totalScanRangeLocationsList);
+            Assertions.assertEquals(policy.numBackends(), aliveBackendNum);
+            // for (TScanRangeLocations scanRangeLocations : 
tScanRangeLocationsList) {
+            //     
System.out.println(policy.getNextConsistentBe(scanRangeLocations).getId());
+            // }
+            int totalSplitNum = 0;
+            List<Split> totalSplits = new ArrayList<>();
+            totalSplits.addAll(remoteSplits);
+            totalSplits.addAll(localSplits);
+            // Collections.shuffle(totalSplits);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
+            if (i == 0) {
+                result = ArrayListMultimap.create(assignment);
+            } else {
+                Assertions.assertEquals(result, assignment);
+            }
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> splits = assignment.get(backend);
+                long scanBytes = 0L;
+                for (Split split : splits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    ++totalSplitNum;
+                    if (fileSplit.getHosts() != null && 
fileSplit.getHosts().length > 0) {
+                        for (String host : fileSplit.getHosts()) {
+                            Assert.assertTrue(totalLocalHosts.contains(host));
+                        }
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
splits.size(), scanBytes);
+            }
+            Assert.assertEquals(totalSplits.size(), totalSplitNum);
+            // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+            // int variance = 5 * scanRangeSize;
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+                // Assert.assertTrue(Math.abs(entry.getValue() - avg) < 
variance);
+            }
+        }
+    }
+
+    @Test
+    public void testComputeScanRangeAssignmentNodeChanged() throws 
UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
+        Map<Split, Backend> originSplitAssignedBackends = new HashMap<>();
+            {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            int backendNum = 3;
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    originSplitAssignedBackends.put(split, backend);
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+            // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+            // int variance = 5 * scanRangeSize;
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+                // Assert.assertTrue(Math.abs(entry.getValue() - avg) < 
variance);
+            }
+            }
+
+        // remove a node
+        // {
+        //     service.dropBackend(backend3.getId());
+        //     int changed = 0;
+        //
+        //     FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+        //     policy.init();
+        //     int backendNum = 2;
+        //     Assertions.assertEquals(policy.numBackends(), backendNum);
+        //     Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+        //
+        //     for (Backend backend : assignment.keySet()) {
+        //         Collection<Split> assignedSplits = assignment.get(backend);
+        //         long ScanBytes = 0L;
+        //         for (Split split : assignedSplits) {
+        //             FileSplit fileSplit = (FileSplit) split;
+        //             ScanBytes += fileSplit.getLength();
+        //             Backend origin = originSplitAssignedBackends.get(split);
+        //             if (!backend.equals(origin)) {
+        //                 changed += 1;
+        //             }
+        //         }
+        //         System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), ScanBytes);
+        //     }
+        //
+        //     // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+        //     // int variance = 5 * scanRangeSize;
+        //     Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+        //     for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+        //         System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+        //         // Assert.assertTrue(Math.abs(entry.getValue() - avg) < 
variance);
+        //     }
+        //
+        //     float moveRatio = changed * 1.0f / assignment.values().size();
+        //     System.out.printf("Remove a node: move ratio = %.2f\n", 
moveRatio);
+        // }
+        //
+        // // add a node
+        // {
+        //     Backend backend4 = new Backend(10004L, "172.30.0.128", 9050);
+        //     backend4.setAlive(true);
+        //     service.addBackend(backend4);
+        //     int changed = 0;
+        //
+        //     FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+        //     policy.init();
+        //     int backendNum = 3;
+        //     Assertions.assertEquals(policy.numBackends(), backendNum);
+        //     Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+        //
+        //     for (Backend backend : assignment.keySet()) {
+        //         Collection<Split> assignedSplits = assignment.get(backend);
+        //         long ScanBytes = 0L;
+        //         for (Split split : assignedSplits) {
+        //             FileSplit fileSplit = (FileSplit) split;
+        //             ScanBytes += fileSplit.getLength();
+        //             Backend origin = originSplitAssignedBackends.get(split);
+        //             if (!backend.equals(origin)) {
+        //                 changed += 1;
+        //             }
+        //         }
+        //         System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), ScanBytes);
+        //     }
+        //     // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+        //     // int variance = 5 * scanRangeSize;
+        //     Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+        //     for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+        //         System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+        //         // Assert.assertTrue(Math.abs(entry.getValue() - avg) < 
variance);
+        //     }
+        //
+        //     float moveRatio = changed * 1.0f / assignment.values().size();
+        //     System.out.printf("Add a node, move ratio = %.2f\n", moveRatio);
+        // }
+
+    }
+
+
     private TScanRangeLocations getScanRangeLocations(String path, long 
startOffset, long size) {
         // Generate on file scan range
         TFileScanRange fileScanRange = new TFileScanRange();
@@ -145,3 +744,4 @@ public class FederationBackendPolicyTest {
         return rangeDesc;
     }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to