This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 383850ef12b07b9323276314b26ec6da7e792e86
Author: Qi Chen <[email protected]>
AuthorDate: Sun Feb 4 11:13:29 2024 +0800

    [Opt](multi-catalog) Opt split assignment to resolve uneven distribution. 
(#30390)
    
    [Opt] (multi-catalog) Opt split assignment to resolve uneven distribution. 
Currently only for `FileQueryScanNode`.
    
    Referring to the implementation of Trino,
    - Local node soft affinity optimization. Prefer local replication node.
    - Remote split will use the consistent hash algorithm is used when the file 
cache is turned on, and because of the possible unevenness of the consistent 
hash, the split is re-adjusted so that the maximum and minimum split numbers of 
hosts differ by at most `max_split_num_variance` split.
    - Remote split will use the round-robin algorithm is used when the file 
cache is turned off.
---
 .../main/java/org/apache/doris/common/Config.java  |  23 +-
 .../apache/doris/common/IndexedPriorityQueue.java  | 228 +++++++
 .../doris/{spi/Split.java => common/Queue.java}    |  21 +-
 .../doris/common/ResettableRandomizedIterator.java |  63 ++
 .../UpdateablePriorityQueue.java}                  |  18 +-
 .../apache/doris/common/util/ConsistentHash.java   |  39 +-
 .../doris/planner/external/ExternalScanNode.java   |   6 +-
 .../planner/external/FederationBackendPolicy.java  | 361 +++++++++-
 .../doris/planner/external/FileQueryScanNode.java  | 129 ++--
 .../apache/doris/planner/external/FileSplit.java   |   7 +
 .../external/NodeSelectionStrategy.java}           |  16 +-
 .../apache/doris/planner/external/SplitWeight.java | 130 ++++
 .../src/main/java/org/apache/doris/spi/Split.java  |  23 +
 .../main/java/org/apache/doris/system/Backend.java |   2 +-
 .../doris/planner/FederationBackendPolicyTest.java | 730 ++++++++++++++++++---
 15 files changed, 1587 insertions(+), 209 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 4fcd815072e..be58c139852 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2239,7 +2239,28 @@ public class Config extends ConfigBase {
             "When file cache is enabled, the number of virtual nodes of each 
node in the consistent hash algorithm. "
                     + "The larger the value, the more uniform the distribution 
of the hash algorithm, "
                     + "but it will increase the memory overhead."})
-    public static int virtual_node_number = 2048;
+    public static int split_assigner_virtual_node_number = 256;
+
+    @ConfField(mutable = true, description = {
+            "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。",
+            "Local node soft affinity optimization. Prefer local replication 
node."})
+    public static boolean split_assigner_optimized_local_scheduling = true;
+
+    @ConfField(mutable = true, description = {
+            "随机算法最小的候选数目,会选取相对最空闲的节点。",
+            "The random algorithm has the smallest number of candidates and 
will select the most idle node."})
+    public static int split_assigner_min_random_candidate_num = 2;
+
+    @ConfField(mutable = true, description = {
+            "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。",
+            "The consistent hash algorithm has the smallest number of 
candidates and will select the most idle node."})
+    public static int split_assigner_min_consistent_hash_candidate_num = 2;
+
+    @ConfField(mutable = true, description = {
+            "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。",
+            "The maximum difference in the number of splits between nodes. "
+                    + "If this number is exceeded, the splits will be 
redistributed."})
+    public static int split_assigner_max_split_num_variance = 1;
 
     @ConfField(description = {
             "控制统计信息的自动触发作业执行记录的持久化行数",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java
new file mode 100644
index 00000000000..f93db510e75
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * A priority queue with constant time contains(E) and log time remove(E)
+ * Ties are broken by insertion order
+ */
+public final class IndexedPriorityQueue<E>
+        implements UpdateablePriorityQueue<E> {
+    private final Map<E, Entry<E>> index = new HashMap<>();
+    private final Set<Entry<E>> queue;
+    private long generation;
+
+    public IndexedPriorityQueue() {
+        this(PriorityOrdering.HIGH_TO_LOW);
+    }
+
+    public IndexedPriorityQueue(PriorityOrdering priorityOrdering) {
+        switch (priorityOrdering) {
+            case LOW_TO_HIGH:
+                queue = new TreeSet<>(
+                        Comparator.comparingLong((Entry<E> entry) -> 
entry.getPriority())
+                                .thenComparingLong(Entry::getGeneration));
+                break;
+            case HIGH_TO_LOW:
+                queue = new TreeSet<>((entry1, entry2) -> {
+                    int priorityComparison = 
Long.compare(entry2.getPriority(), entry1.getPriority());
+                    if (priorityComparison != 0) {
+                        return priorityComparison;
+                    }
+                    return Long.compare(entry1.getGeneration(), 
entry2.getGeneration());
+                });
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    @Override
+    public boolean addOrUpdate(E element, long priority) {
+        Entry<E> entry = index.get(element);
+        if (entry != null) {
+            if (entry.getPriority() == priority) {
+                return false;
+            }
+            queue.remove(entry);
+            Entry<E> newEntry = new Entry<>(element, priority, 
entry.getGeneration());
+            queue.add(newEntry);
+            index.put(element, newEntry);
+            return false;
+        }
+        Entry<E> newEntry = new Entry<>(element, priority, generation);
+        generation++;
+        queue.add(newEntry);
+        index.put(element, newEntry);
+        return true;
+    }
+
+    @Override
+    public boolean contains(E element) {
+        return index.containsKey(element);
+    }
+
+    @Override
+    public boolean remove(E element) {
+        Entry<E> entry = index.remove(element);
+        if (entry != null) {
+            queue.remove(entry);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public E poll() {
+        Entry<E> entry = pollEntry();
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public E peek() {
+        Entry<E> entry = peekEntry();
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    @Override
+    public int size() {
+        return queue.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return queue.isEmpty();
+    }
+
+    public Prioritized<E> getPrioritized(E element) {
+        Entry<E> entry = index.get(element);
+        if (entry == null) {
+            return null;
+        }
+
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    public Prioritized<E> pollPrioritized() {
+        Entry<E> entry = pollEntry();
+        if (entry == null) {
+            return null;
+        }
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    private Entry<E> pollEntry() {
+        Iterator<Entry<E>> iterator = queue.iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        Entry<E> entry = iterator.next();
+        iterator.remove();
+        Preconditions.checkState(index.remove(entry.getValue()) != null, 
"Failed to remove entry from index");
+        return entry;
+    }
+
+    public Prioritized<E> peekPrioritized() {
+        Entry<E> entry = peekEntry();
+        if (entry == null) {
+            return null;
+        }
+        return new Prioritized<>(entry.getValue(), entry.getPriority());
+    }
+
+    public Entry<E> peekEntry() {
+        Iterator<Entry<E>> iterator = queue.iterator();
+        if (!iterator.hasNext()) {
+            return null;
+        }
+        return iterator.next();
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return Iterators.transform(queue.iterator(), Entry::getValue);
+    }
+
+    public enum PriorityOrdering {
+        LOW_TO_HIGH,
+        HIGH_TO_LOW
+    }
+
+    private static final class Entry<E> {
+        private final E value;
+        private final long priority;
+        private final long generation;
+
+        private Entry(E value, long priority, long generation) {
+            this.value = Objects.requireNonNull(value, "value is null");
+            this.priority = priority;
+            this.generation = generation;
+        }
+
+        public E getValue() {
+            return value;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+
+        public long getGeneration() {
+            return generation;
+        }
+    }
+
+    public static class Prioritized<V> {
+        private final V value;
+        private final long priority;
+
+        public Prioritized(V value, long priority) {
+            this.value = Objects.requireNonNull(value, "value is null");
+            this.priority = priority;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        public long getPriority() {
+            return priority;
+        }
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java
similarity index 68%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to fe/fe-core/src/main/java/org/apache/doris/common/Queue.java
index 31b1e1515a5..6dc58a35231 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java
@@ -14,16 +14,23 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java
+// and modified by Doris
 
-package org.apache.doris.spi;
+package org.apache.doris.common;
 
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
+interface Queue<E> {
+    boolean contains(E element);
 
-    String[] getHosts();
+    boolean remove(E element);
 
-    Object getInfo();
+    E poll();
 
+    E peek();
+
+    int size();
+
+    boolean isEmpty();
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java
new file mode 100644
index 00000000000..11d18604356
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java
+// and modified by Doris
+
+package org.apache.doris.common;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ResettableRandomizedIterator<T>
+        implements Iterator<T> {
+    private final List<T> list;
+    private int position;
+
+    public ResettableRandomizedIterator(Collection<T> elements) {
+        this.list = new ArrayList<>(elements);
+    }
+
+    public void reset() {
+        position = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return position < list.size();
+    }
+
+    @Override
+    public T next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        int position = ThreadLocalRandom.current().nextInt(this.position, 
list.size());
+
+        T result = list.set(position, list.get(this.position));
+        list.set(this.position, result);
+        this.position++;
+
+        return result;
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java
similarity index 69%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java
index 31b1e1515a5..c9c0a2b3deb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java
@@ -14,16 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/UpdateablePriorityQueue.java
+// and modified by Doris
 
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
-    String[] getHosts();
-
-    Object getInfo();
+package org.apache.doris.common;
 
+interface UpdateablePriorityQueue<E>
+        extends Queue<E>, Iterable<E> {
+    boolean addOrUpdate(E element, long priority);
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
index 4ef6e4168f1..85199ad32fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
@@ -17,11 +17,16 @@
 
 package org.apache.doris.common.util;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 
 import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -89,14 +94,38 @@ public class ConsistentHash<K, N> {
         }
     }
 
-    public N getNode(K key) {
-        if (ring.isEmpty()) {
-            return null;
+    public List<N> getNode(K key, int count) {
+        int nodeCount = ring.values().size();
+        if (count > nodeCount) {
+            count = nodeCount;
         }
+
+        Set<N> uniqueNodes = new LinkedHashSet<>();
+
         Hasher hasher = hashFunction.newHasher();
         Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
+
         SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
-        hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
-        return ring.get(hashKey).getNode();
+        // Start reading from tail
+        for (Map.Entry<Long, VirtualNode> entry : tailMap.entrySet()) {
+            uniqueNodes.add(entry.getValue().node);
+            if (uniqueNodes.size() == count) {
+                break;
+            }
+        }
+
+        if (uniqueNodes.size() < count) {
+            // Start reading from the head as we have exhausted tail
+            SortedMap<Long, VirtualNode> headMap = ring.headMap(hashKey);
+            for (Map.Entry<Long, VirtualNode> entry : headMap.entrySet()) {
+                uniqueNodes.add(entry.getValue().node);
+                if (uniqueNodes.size() == count) {
+                    break;
+                }
+            }
+        }
+
+        return ImmutableList.copyOf(uniqueNodes);
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
index a4751b5205f..cbfe3188817 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TScanRangeLocations;
 
@@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode {
     // set to false means this scan node does not need to check column priv.
     protected boolean needCheckColumnPriv;
 
-    protected final FederationBackendPolicy backendPolicy = new 
FederationBackendPolicy();
+    protected final FederationBackendPolicy backendPolicy = 
(ConnectContext.get() != null
+            && ConnectContext.get().getSessionVariable().enableFileCache)
+            ? new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) :  new 
FederationBackendPolicy();
 
     public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode {
         return scanRangeLocations.size();
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index ade03291c30..df0e955ea8e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -14,28 +14,38 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+// This file is referenced from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java
+// and modified by Doris
 
 package org.apache.doris.planner.external;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.IndexedPriorityQueue;
+import org.apache.doris.common.ResettableRandomizedIterator;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.ConsistentHash;
 import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.system.SystemInfoService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.Hashing;
@@ -45,12 +55,16 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -59,35 +73,47 @@ public class FederationBackendPolicy {
     private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
     private final List<Backend> backends = Lists.newArrayList();
     private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
-    private final SecureRandom random = new SecureRandom();
-    private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
+
+    public Map<Backend, Long> getAssignedWeightPerBackend() {
+        return assignedWeightPerBackend;
+    }
+
+    private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
+
+    private ConsistentHash<Split, Backend> consistentHash;
 
     private int nextBe = 0;
     private boolean initialized = false;
 
+    private NodeSelectionStrategy nodeSelectionStrategy;
+    private boolean enableSplitsRedistribution = true;
+
     // Create a ConsistentHash ring may be a time-consuming operation, so we 
cache it.
-    private static LoadingCache<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
+    private static LoadingCache<HashCacheKey, ConsistentHash<Split, Backend>> 
consistentHashCache;
 
     static {
         consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
-                .build(new CacheLoader<HashCacheKey, 
ConsistentHash<TScanRangeLocations, Backend>>() {
+                .build(new CacheLoader<HashCacheKey, ConsistentHash<Split, 
Backend>>() {
                     @Override
-                    public ConsistentHash<TScanRangeLocations, Backend> 
load(HashCacheKey key) {
-                        return new ConsistentHash<>(Hashing.murmur3_128(), new 
ScanRangeHash(),
-                                new BackendHash(), key.bes, 
Config.virtual_node_number);
+                    public ConsistentHash<Split, Backend> load(HashCacheKey 
key) {
+                        return new ConsistentHash<>(Hashing.murmur3_128(), new 
SplitHash(),
+                                new BackendHash(), key.bes, 
Config.split_assigner_virtual_node_number);
                     }
                 });
     }
 
     private static class HashCacheKey {
         // sorted backend ids as key
-        private List<Long> beIds;
+        private List<String> beHashKeys;
         // backends is not part of key, just an attachment
         private List<Backend> bes;
 
         HashCacheKey(List<Backend> backends) {
             this.bes = backends;
-            this.beIds = backends.stream().map(b -> 
b.getId()).sorted().collect(Collectors.toList());
+            this.beHashKeys = backends.stream().map(b ->
+                            String.format("id: %d, host: %s, port: %d", 
b.getId(), b.getHost(), b.getHeartbeatPort()))
+                    .sorted()
+                    .collect(Collectors.toList());
         }
 
         @Override
@@ -98,20 +124,28 @@ public class FederationBackendPolicy {
             if (!(obj instanceof HashCacheKey)) {
                 return false;
             }
-            return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
+            return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(beIds);
+            return Objects.hash(beHashKeys);
         }
 
         @Override
         public String toString() {
-            return "HashCache{" + "beIds=" + beIds + '}';
+            return "HashCache{" + "beHashKeys=" + beHashKeys + '}';
         }
     }
 
+    public FederationBackendPolicy(NodeSelectionStrategy 
nodeSelectionStrategy) {
+        this.nodeSelectionStrategy = nodeSelectionStrategy;
+    }
+
+    public FederationBackendPolicy() {
+        this(NodeSelectionStrategy.ROUND_ROBIN);
+    }
+
     public void init() throws UserException {
         if (!initialized) {
             init(Collections.emptyList());
@@ -152,6 +186,10 @@ public class FederationBackendPolicy {
         if (backends.isEmpty()) {
             throw new UserException("No available backends");
         }
+        for (Backend backend : backends) {
+            assignedWeightPerBackend.put(backend, 0L);
+        }
+
         
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
         try {
             consistentHash = consistentHashCache.get(new 
HashCacheKey(backends));
@@ -166,23 +204,280 @@ public class FederationBackendPolicy {
         return selectedBackend;
     }
 
-    public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) 
{
-        return consistentHash.getNode(scanRangeLocations);
+    @VisibleForTesting
+    public void setEnableSplitsRedistribution(boolean 
enableSplitsRedistribution) {
+        this.enableSplitsRedistribution = enableSplitsRedistribution;
+    }
+
+    public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> 
splits) throws UserException {
+        // Sorting splits is to ensure that the same query utilizes the os 
page cache as much as possible.
+        splits.sort((split1, split2) -> {
+            int pathComparison = 
split1.getPathString().compareTo(split2.getPathString());
+            if (pathComparison != 0) {
+                return pathComparison;
+            }
+
+            int startComparison = Long.compare(split1.getStart(), 
split2.getStart());
+            if (startComparison != 0) {
+                return startComparison;
+            }
+            return Long.compare(split1.getLength(), split2.getLength());
+        });
+
+        ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
+
+        List<Split> remainingSplits = null;
+
+        List<Backend> backends = new ArrayList<>();
+        for (List<Backend> backendList : backendMap.values()) {
+            backends.addAll(backendList);
+        }
+        ResettableRandomizedIterator<Backend> randomCandidates = new 
ResettableRandomizedIterator<>(backends);
+
+        boolean splitsToBeRedistributed = false;
+
+        // optimizedLocalScheduling enables prioritized assignment of splits 
to local nodes when splits contain
+        // locality information
+        if (Config.split_assigner_optimized_local_scheduling) {
+            remainingSplits = new ArrayList<>(splits.size());
+            for (int i = 0; i < splits.size(); ++i) {
+                Split split = splits.get(i);
+                if (split.isRemotelyAccessible() && (split.getHosts() != null 
&& split.getHosts().length > 0)) {
+                    List<Backend> candidateNodes = 
selectExactNodes(backendMap, split.getHosts());
+
+                    Optional<Backend> chosenNode = candidateNodes.stream()
+                            .min(Comparator.comparingLong(ownerNode -> 
assignedWeightPerBackend.get(ownerNode)));
+
+                    if (chosenNode.isPresent()) {
+                        Backend selectedBackend = chosenNode.get();
+                        assignment.put(selectedBackend, split);
+                        assignedWeightPerBackend.put(selectedBackend,
+                                assignedWeightPerBackend.get(selectedBackend) 
+ split.getSplitWeight().getRawValue());
+                        splitsToBeRedistributed = true;
+                        continue;
+                    }
+                }
+                remainingSplits.add(split);
+            }
+        } else {
+            remainingSplits = splits;
+        }
+
+        for (Split split : remainingSplits) {
+            List<Backend> candidateNodes;
+            if (!split.isRemotelyAccessible()) {
+                candidateNodes = selectExactNodes(backendMap, 
split.getHosts());
+            } else {
+                switch (nodeSelectionStrategy) {
+                    case ROUND_ROBIN: {
+                        Backend selectedBackend = backends.get(nextBe++);
+                        nextBe = nextBe % backends.size();
+                        candidateNodes = ImmutableList.of(selectedBackend);
+                        break;
+                    }
+                    case RANDOM: {
+                        randomCandidates.reset();
+                        candidateNodes = 
selectNodes(Config.split_assigner_min_random_candidate_num, randomCandidates);
+                        break;
+                    }
+                    case CONSISTENT_HASHING: {
+                        candidateNodes = consistentHash.getNode(split,
+                                
Config.split_assigner_min_consistent_hash_candidate_num);
+                        splitsToBeRedistributed = true;
+                        break;
+                    }
+                    default: {
+                        throw new RuntimeException();
+                    }
+                }
+            }
+            if (candidateNodes.isEmpty()) {
+                LOG.debug("No nodes available to schedule {}. Available nodes 
{}", split,
+                        backends);
+                throw new 
UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
+            }
+
+            Backend selectedBackend = chooseNodeForSplit(candidateNodes);
+            List<Backend> alternativeBackends = new 
ArrayList<>(candidateNodes);
+            alternativeBackends.remove(selectedBackend);
+            split.setAlternativeHosts(
+                    alternativeBackends.stream().map(each -> 
each.getHost()).collect(Collectors.toList()));
+            assignment.put(selectedBackend, split);
+            assignedWeightPerBackend.put(selectedBackend,
+                    assignedWeightPerBackend.get(selectedBackend) + 
split.getSplitWeight().getRawValue());
+        }
+
+        if (enableSplitsRedistribution && splitsToBeRedistributed) {
+            equateDistribution(assignment);
+        }
+        return assignment;
+    }
+
+    /**
+     * The method tries to make the distribution of splits more uniform. All 
nodes are arranged into a maxHeap and
+     * a minHeap based on the number of splits that are assigned to them. 
Splits are redistributed, one at a time,
+     * from a maxNode to a minNode until we have as uniform a distribution as 
possible.
+     *
+     * @param assignment the node-splits multimap after the first and the 
second stage
+     */
+    private void equateDistribution(ListMultimap<Backend, Split> assignment) {
+        if (assignment.isEmpty()) {
+            return;
+        }
+
+        List<Backend> allNodes = new ArrayList<>();
+        for (List<Backend> backendList : backendMap.values()) {
+            allNodes.addAll(backendList);
+        }
+        Collections.sort(allNodes, Comparator.comparing(Backend::getId));
+
+        if (allNodes.size() < 2) {
+            return;
+        }
+
+        IndexedPriorityQueue<Backend> maxNodes = new IndexedPriorityQueue<>();
+        for (Backend node : allNodes) {
+            maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node));
+        }
+
+        IndexedPriorityQueue<Backend> minNodes = new IndexedPriorityQueue<>();
+        for (Backend node : allNodes) {
+            minNodes.addOrUpdate(node, Long.MAX_VALUE - 
assignedWeightPerBackend.get(node));
+        }
+
+        while (true) {
+            if (maxNodes.isEmpty()) {
+                return;
+            }
+
+            // fetch min and max node
+            Backend maxNode = maxNodes.poll();
+            Backend minNode = minNodes.poll();
+
+            // Allow some degree of non uniformity when assigning splits to 
nodes. Usually data distribution
+            // among nodes in a cluster won't be fully uniform (e.g. because 
hash function with non-uniform
+            // distribution is used like consistent hashing). In such case it 
makes sense to assign splits to nodes
+            // with data because of potential savings in network throughput 
and CPU time.
+            if (assignedWeightPerBackend.get(maxNode) - 
assignedWeightPerBackend.get(minNode)
+                    <= 
SplitWeight.rawValueForStandardSplitCount(Config.split_assigner_max_split_num_variance))
 {
+                return;
+            }
+
+            // move split from max to min
+            Split redistributedSplit = redistributeSplit(assignment, maxNode, 
minNode);
+
+            assignedWeightPerBackend.put(maxNode,
+                    assignedWeightPerBackend.get(maxNode) - 
redistributedSplit.getSplitWeight().getRawValue());
+            assignedWeightPerBackend.put(minNode, Math.addExact(
+                    assignedWeightPerBackend.get(minNode), 
redistributedSplit.getSplitWeight().getRawValue()));
+
+            // add max back into maxNodes only if it still has assignments
+            if (assignment.containsKey(maxNode)) {
+                maxNodes.addOrUpdate(maxNode, 
assignedWeightPerBackend.get(maxNode));
+            }
+
+            // Add or update both the Priority Queues with the updated node 
priorities
+            maxNodes.addOrUpdate(minNode, 
assignedWeightPerBackend.get(minNode));
+            minNodes.addOrUpdate(minNode, Long.MAX_VALUE - 
assignedWeightPerBackend.get(minNode));
+            minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - 
assignedWeightPerBackend.get(maxNode));
+        }
+    }
+
+    /**
+     * The method selects and removes a split from the fromNode and assigns it 
to the toNode. There is an attempt to
+     * redistribute a Non-local split if possible. This case is possible when 
there are multiple queries running
+     * simultaneously. If a Non-local split cannot be found in the maxNode, 
next split is selected and reassigned.
+     */
+    @VisibleForTesting
+    public static Split redistributeSplit(Multimap<Backend, Split> assignment, 
Backend fromNode,
+            Backend toNode) {
+        Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
+        Split splitToBeRedistributed = null;
+        while (splitIterator.hasNext()) {
+            Split split = splitIterator.next();
+            // Try to select non-local split for redistribution
+            if (split.getHosts() != null && !isSplitLocal(
+                    split.getHosts(), fromNode.getHost())) {
+                splitToBeRedistributed = split;
+                break;
+            }
+        }
+        // Select split if maxNode has no non-local splits in the current 
batch of assignment
+        if (splitToBeRedistributed == null) {
+            splitIterator = assignment.get(fromNode).iterator();
+            while (splitIterator.hasNext()) {
+                splitToBeRedistributed = splitIterator.next();
+                // if toNode has split replication, transfer this split firstly
+                if (splitToBeRedistributed.getHosts() != null && isSplitLocal(
+                        splitToBeRedistributed.getHosts(), toNode.getHost())) {
+                    break;
+                }
+                // if toNode is split alternative host, transfer this split 
firstly
+                if (splitToBeRedistributed.getAlternativeHosts() != null && 
isSplitLocal(
+                        splitToBeRedistributed.getAlternativeHosts(), 
toNode.getHost())) {
+                    break;
+                }
+            }
+        }
+        splitIterator.remove();
+        assignment.put(toNode, splitToBeRedistributed);
+        return splitToBeRedistributed;
     }
 
-    // Try to find a local BE, if not exists, use `getNextBe` instead
-    public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations 
scanRangeLocations) {
-        List<Backend> candidateBackends = 
Lists.newArrayListWithCapacity(hosts.size());
+    private static boolean isSplitLocal(String[] splitHosts, String host) {
+        for (String splitHost : splitHosts) {
+            if (splitHost.equals(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean isSplitLocal(List<String> splitHosts, String host) {
+        for (String splitHost : splitHosts) {
+            if (splitHost.equals(host)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static List<Backend> selectExactNodes(Map<String, List<Backend>> 
backendMap, String[] hosts) {
+        Set<Backend> chosen = new LinkedHashSet<>();
+
         for (String host : hosts) {
-            List<Backend> backends = backendMap.get(host);
-            if (CollectionUtils.isNotEmpty(backends)) {
-                
candidateBackends.add(backends.get(random.nextInt(backends.size())));
+            if (backendMap.containsKey(host)) {
+                backendMap.get(host).stream()
+                        .forEach(chosen::add);
             }
         }
+        return ImmutableList.copyOf(chosen);
+    }
 
-        return CollectionUtils.isEmpty(candidateBackends)
-                    ? getNextConsistentBe(scanRangeLocations)
-                    : 
candidateBackends.get(random.nextInt(candidateBackends.size()));
+    public static List<Backend> selectNodes(int limit, Iterator<Backend> 
candidates) {
+        Preconditions.checkArgument(limit > 0, "limit must be at least 1");
+
+        List<Backend> selected = new ArrayList<>(limit);
+        while (selected.size() < limit && candidates.hasNext()) {
+            selected.add(candidates.next());
+        }
+
+        return selected;
+    }
+
+    private Backend chooseNodeForSplit(List<Backend> candidateNodes) {
+        Backend chosenNode = null;
+        long minWeight = Long.MAX_VALUE;
+
+        for (Backend node : candidateNodes) {
+            long queuedWeight = assignedWeightPerBackend.get(node);
+            if (queuedWeight <= minWeight) {
+                chosenNode = node;
+                minWeight = queuedWeight;
+            }
+        }
+
+        return chosenNode;
     }
 
     public int numBackends() {
@@ -200,15 +495,13 @@ public class FederationBackendPolicy {
         }
     }
 
-    private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
+    private static class SplitHash implements Funnel<Split> {
         @Override
-        public void funnel(TScanRangeLocations scanRange, PrimitiveSink 
primitiveSink) {
-            Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
-            for (TFileRangeDesc desc : 
scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
-                
primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
-                primitiveSink.putLong(desc.start_offset);
-                primitiveSink.putLong(desc.size);
-            }
+        public void funnel(Split split, PrimitiveSink primitiveSink) {
+            
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
+            primitiveSink.putLong(split.getStart());
+            primitiveSink.putLong(split.getLength());
         }
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 4a3c3d2ff9a..a374df5e602 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -68,13 +68,14 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import lombok.Getter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -314,76 +315,73 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             params.setProperties(locationProperties);
         }
 
-        boolean enableShortCircuitRead = 
HdfsResource.enableShortCircuitRead(locationProperties);
         List<String> pathPartitionKeys = getPathPartitionKeys();
-        for (Split split : inputSplits) {
-            FileSplit fileSplit = (FileSplit) split;
-            TFileType locationType;
-            if (fileSplit instanceof IcebergSplit
-                    && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
-                locationType = TFileType.FILE_BROKER;
-            } else {
-                locationType = getLocationType(fileSplit.getPath().toString());
-            }
 
-            TScanRangeLocations curLocations = newLocations();
-            // If fileSplit has partition values, use the values collected 
from hive partitions.
-            // Otherwise, use the values in file path.
-            boolean isACID = false;
-            if (fileSplit instanceof HiveSplit) {
-                HiveSplit hiveSplit = (HiveSplit) split;
-                isACID = hiveSplit.isACID();
-            }
-            List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
-                    ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys, false, isACID)
-                    : fileSplit.getPartitionValues();
-
-            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
-                    locationType);
-            TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
-            rangeDesc.setCompressType(fileCompressType);
-            if (isACID) {
-                HiveSplit hiveSplit = (HiveSplit) split;
-                
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
-                TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
-                
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
-                AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
-                TTransactionalHiveDesc transactionalHiveDesc = new 
TTransactionalHiveDesc();
-                
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
-                List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new 
ArrayList<>();
-                for (DeleteDeltaInfo deleteDeltaInfo : 
acidInfo.getDeleteDeltas()) {
-                    TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new 
TTransactionalHiveDeleteDeltaDesc();
-                    
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
-                    
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
-                    deleteDeltaDescs.add(deleteDeltaDesc);
+        Multimap<Backend, Split> assignment =  
backendPolicy.computeScanRangeAssignment(inputSplits);
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> splits = assignment.get(backend);
+            for (Split split : splits) {
+                FileSplit fileSplit = (FileSplit) split;
+                TFileType locationType;
+                if (fileSplit instanceof IcebergSplit
+                        && ((IcebergSplit) 
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+                    locationType = TFileType.FILE_BROKER;
+                } else {
+                    locationType = 
getLocationType(fileSplit.getPath().toString());
+                }
+
+                TScanRangeLocations curLocations = newLocations();
+                // If fileSplit has partition values, use the values collected 
from hive partitions.
+                // Otherwise, use the values in file path.
+                boolean isACID = false;
+                if (fileSplit instanceof HiveSplit) {
+                    HiveSplit hiveSplit = (HiveSplit) fileSplit;
+                    isACID = hiveSplit.isACID();
+                }
+                List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
+                        ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys,
+                        false, isACID) : fileSplit.getPartitionValues();
+
+                TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys,
+                        locationType);
+                TFileCompressType fileCompressType = 
getFileCompressType(fileSplit);
+                rangeDesc.setCompressType(fileCompressType);
+                if (isACID) {
+                    HiveSplit hiveSplit = (HiveSplit) fileSplit;
+                    
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+                    TTableFormatFileDesc tableFormatFileDesc = new 
TTableFormatFileDesc();
+                    
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+                    AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+                    TTransactionalHiveDesc transactionalHiveDesc = new 
TTransactionalHiveDesc();
+                    
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+                    List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = 
new ArrayList<>();
+                    for (DeleteDeltaInfo deleteDeltaInfo : 
acidInfo.getDeleteDeltas()) {
+                        TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = 
new TTransactionalHiveDeleteDeltaDesc();
+                        
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+                        
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+                        deleteDeltaDescs.add(deleteDeltaDesc);
+                    }
+                    transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+                    
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+                    rangeDesc.setTableFormatParams(tableFormatFileDesc);
                 }
-                transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
-                
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
-                rangeDesc.setTableFormatParams(tableFormatFileDesc);
-            }
 
-            setScanParams(rangeDesc, fileSplit);
-
-            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-            TScanRangeLocation location = new TScanRangeLocation();
-            Backend selectedBackend;
-            if (enableShortCircuitRead) {
-                // Try to find a local BE if enable hdfs short circuit read
-                selectedBackend = 
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
-            } else {
-                // Use consistent hash to assign the same scan range into the 
same backend among different queries
-                selectedBackend = 
backendPolicy.getNextConsistentBe(curLocations);
+                setScanParams(rangeDesc, fileSplit);
+
+                
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+                TScanRangeLocation location = new TScanRangeLocation();
+                setLocationPropertiesIfNecessary(backend, locationType, 
locationProperties);
+                location.setBackendId(backend.getId());
+                location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
+                curLocations.addToLocations(location);
+                LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
+                        curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
+                        fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
+                scanRangeLocations.add(curLocations);
+                this.totalFileSize += fileSplit.getLength();
             }
-            setLocationPropertiesIfNecessary(selectedBackend, locationType, 
locationProperties);
-            location.setBackendId(selectedBackend.getId());
-            location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
-            curLocations.addToLocations(location);
-            LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
-                    curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
-                    fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
-            scanRangeLocations.add(curLocations);
-            this.totalFileSize += fileSplit.getLength();
         }
+
         if (ConnectContext.get().getExecutor() != null) {
             
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
         }
@@ -518,3 +516,4 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
     protected abstract Map<String, String> getLocationProperties() throws 
UserException;
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 4e9d0bae564..1221d3ad21b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -42,6 +42,8 @@ public class FileSplit implements Split {
     // partitionValues would be ["part1", "part2"]
     protected List<String> partitionValues;
 
+    protected List<String> alternativeHosts;
+
     public FileSplit(Path path, long start, long length, long fileLength,
             long modificationTime, String[] hosts, List<String> 
partitionValues) {
         this.path = path;
@@ -67,6 +69,11 @@ public class FileSplit implements Split {
         return null;
     }
 
+    @Override
+    public String getPathString() {
+        return path.toString();
+    }
+
     public static class FileSplitCreator implements SplitCreator {
 
         public static final FileSplitCreator DEFAULT = new FileSplitCreator();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
similarity index 83%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
index 31b1e1515a5..4b4d10a4240 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
@@ -15,15 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
-    String[] getHosts();
-
-    Object getInfo();
+package org.apache.doris.planner.external;
 
+public enum NodeSelectionStrategy {
+    ROUND_ROBIN,
+    RANDOM,
+    CONSISTENT_HASHING
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
new file mode 100644
index 00000000000..d8724017b67
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.errorprone.annotations.DoNotCall;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.function.Function;
+
+public final class SplitWeight {
+    private static final long UNIT_VALUE = 100;
+    private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ 
UNIT_SCALE) == UNIT_VALUE
+    private static final SplitWeight STANDARD_WEIGHT = new 
SplitWeight(UNIT_VALUE);
+
+    private final long value;
+
+    private SplitWeight(long value) {
+        if (value <= 0) {
+            throw new IllegalArgumentException("value must be > 0, found: " + 
value);
+        }
+        this.value = value;
+    }
+
+    /**
+     * Produces a {@link SplitWeight} from the raw internal value 
representation. This method is intended
+     * primarily for JSON deserialization, and connectors should use not call 
this factory method directly
+     * to construct {@link SplitWeight} instances. Instead, connectors should 
use
+     * {@link SplitWeight#fromProportion(double)}
+     * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} 
changes in the future.
+     */
+    @JsonCreator
+    @DoNotCall // For JSON serialization only
+    public static SplitWeight fromRawValue(long value) {
+        return fromRawValueInternal(value);
+    }
+
+    /**
+     * Produces a {@link SplitWeight} that corresponds to the {@link 
SplitWeight#standard()} weight
+     * proportionally, i.e., a parameter of <code>1.0</code> will be 
equivalent to the standard weight
+     * and a value of <code>0.5</code> will be 1/2 of the standard split 
weight. Valid arguments
+     * must be greater than zero and finite. Connectors should prefer 
constructing split weights
+     * using this factory method rather than passing a raw integer value in 
case the integer representation
+     * of a standard split needs to change in the future.
+     *
+     * @param weight the proportional weight relative to a standard split, 
expressed as a double
+     * @return a {@link SplitWeight} with a raw value corresponding to the 
requested proportion
+     */
+    public static SplitWeight fromProportion(double weight) {
+        if (weight <= 0 || !Double.isFinite(weight)) {
+            throw new IllegalArgumentException("Invalid weight: " + weight);
+        }
+        // Must round up to avoid small weights rounding to 0
+        return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE));
+    }
+
+    private static SplitWeight fromRawValueInternal(long value) {
+        return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value);
+    }
+
+    public static SplitWeight standard() {
+        return STANDARD_WEIGHT;
+    }
+
+    public static long rawValueForStandardSplitCount(int splitCount) {
+        if (splitCount < 0) {
+            throw new IllegalArgumentException("splitCount must be >= 0, 
found: " + splitCount);
+        }
+        return Math.multiplyExact(splitCount, UNIT_VALUE);
+    }
+
+    public static <T> long rawValueSum(Collection<T> collection, Function<T, 
SplitWeight> getter) {
+        long sum = 0;
+        for (T item : collection) {
+            long value = getter.apply(item).getRawValue();
+            sum = Math.addExact(sum, value);
+        }
+        return sum;
+    }
+
+    /**
+     * @return The internal integer representation for this weight value
+     */
+    @JsonValue
+    public long getRawValue() {
+        return value;
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof SplitWeight)) {
+            return false;
+        }
+        return this.value == ((SplitWeight) other).value;
+    }
+
+    @Override
+    public String toString() {
+        if (value == UNIT_VALUE) {
+            return "1";
+        }
+        return BigDecimal.valueOf(value, 
-UNIT_SCALE).stripTrailingZeros().toPlainString();
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 31b1e1515a5..d42defe5c38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -17,6 +17,10 @@
 
 package org.apache.doris.spi;
 
+import org.apache.doris.planner.external.SplitWeight;
+
+import java.util.List;
+
 /**
  * Split interface. e.g. Tablet for Olap Table.
  */
@@ -26,4 +30,23 @@ public interface Split {
 
     Object getInfo();
 
+    default SplitWeight getSplitWeight() {
+        return SplitWeight.standard();
+    }
+
+    default boolean isRemotelyAccessible() {
+        return true;
+    }
+
+    String getPathString();
+
+    long getStart();
+
+    long getLength();
+
+    List<String> getAlternativeHosts();
+
+    void setAlternativeHosts(List<String> alternativeHosts);
+
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index fcb5e63e838..47d9619b079 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -622,7 +622,7 @@ public class Backend implements Writable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, host, heartbeatPort, bePort, isAlive);
+        return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index ef65d1b6165..1d213c6a09b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -18,52 +18,55 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.NodeSelectionStrategy;
+import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.thrift.TExternalScanRange;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileScanRange;
-import org.apache.doris.thrift.TScanRange;
-import org.apache.doris.thrift.TScanRangeLocations;
 
-import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
 import mockit.Mock;
 import mockit.MockUp;
 import mockit.Mocked;
-import org.junit.Before;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.jupiter.api.Assertions;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 
 public class FederationBackendPolicyTest {
     @Mocked
     private Env env;
 
-    @Before
-    public void setUp() {
-
+    @Test
+    public void testRemoteSplits() throws UserException {
         SystemInfoService service = new SystemInfoService();
 
-        for (int i = 0; i < 190; i++) {
-            Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 
9050);
-            backend.setAlive(true);
-            service.addBackend(backend);
-        }
-        for (int i = 0; i < 10; i++) {
-            Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." 
+ i, 9051);
-            backend.setAlive(true);
-            service.addBackend(backend);
-        }
-        for (int i = 0; i < 10; i++) {
-            Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." 
+ i, 9050);
-            backend.setAlive(false);
-            service.addBackend(backend);
-        }
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
 
         new MockUp<Env>() {
             @Mock
@@ -72,76 +75,653 @@ public class FederationBackendPolicyTest {
             }
         };
 
-    }
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
 
-    @Test
-    public void testGetNextBe() throws UserException {
         FederationBackendPolicy policy = new FederationBackendPolicy();
         policy.init();
-        int backendNum = 200;
-        int invokeTimes = 1000000;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
-        Stopwatch sw = Stopwatch.createStarted();
-        for (int i = 0; i < invokeTimes; i++) {
-            
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
+
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            long scanBytes = 0L;
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                scanBytes += fileSplit.getLength();
+            }
+            System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
         }
-        sw.stop();
-        System.out.println("Invoke getNextBe() " + invokeTimes
-                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
     }
 
     @Test
-    public void testGetNextLocalBe() throws UserException {
+    public void testHasLocalSplits() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(30002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(30003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(30004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, 
Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, 
Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+
         FederationBackendPolicy policy = new FederationBackendPolicy();
         policy.init();
-        int backendNum = 200;
-        int invokeTimes = 1000000;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
-        List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", 
"192.168.1.2");
-        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
-        Stopwatch sw = Stopwatch.createStarted();
-        for (int i = 0; i < invokeTimes; i++) {
-            
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, 
scanRangeLocations).getHost()));
-        }
-        sw.stop();
-        System.out.println("Invoke getNextLocalBe() " + invokeTimes
-                    + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] 
ms");
+        int totalSplitNum = 0;
+        List<Boolean> checkedLocalSplit = new ArrayList<>();
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                ++totalSplitNum;
+                if (fileSplit.getPath().equals(new Path(
+                        
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
 {
+                    Assert.assertEquals("172.30.0.100", backend.getHost());
+                    checkedLocalSplit.add(true);
+                } else if (fileSplit.getPath().equals(new Path(
+                        
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
 {
+                    Assert.assertEquals("172.30.0.106", backend.getHost());
+                    checkedLocalSplit.add(true);
+                }
+            }
+        }
+        Assert.assertEquals(2, checkedLocalSplit.size());
+        Assert.assertEquals(8, totalSplitNum);
+
+        int maxAssignedSplitNum = Integer.MIN_VALUE;
+        int minAssignedSplitNum = Integer.MAX_VALUE;
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            long scanBytes = 0L;
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                scanBytes += fileSplit.getLength();
+            }
+            if (assignedSplits.size() <= minAssignedSplitNum) {
+                minAssignedSplitNum = assignedSplits.size();
+            }
+            if (assignedSplits.size() >= maxAssignedSplitNum) {
+                maxAssignedSplitNum = assignedSplits.size();
+            }
+            System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+        }
+        Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) 
<= Config.split_assigner_max_split_num_variance);
+
     }
 
     @Test
     public void testConsistentHash() throws UserException {
-        FederationBackendPolicy policy = new FederationBackendPolicy();
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+
+        FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
         policy.init();
-        int backendNum = 200;
+        int backendNum = 3;
         Assertions.assertEquals(policy.numBackends(), backendNum);
 
-        TScanRangeLocations scanRangeLocations = 
getScanRangeLocations("path1", 0, 100);
-        Assertions.assertEquals(39, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+        int maxAssignedSplitNum = Integer.MIN_VALUE;
+        int minAssignedSplitNum = Integer.MAX_VALUE;
+        for (Backend backend : assignment.keySet()) {
+            Collection<Split> assignedSplits = assignment.get(backend);
+            long scanBytes = 0L;
+            for (Split split : assignedSplits) {
+                FileSplit fileSplit = (FileSplit) split;
+                scanBytes += fileSplit.getLength();
+            }
+            if (assignedSplits.size() <= minAssignedSplitNum) {
+                minAssignedSplitNum = assignedSplits.size();
+            }
+            if (assignedSplits.size() >= maxAssignedSplitNum) {
+                maxAssignedSplitNum = assignedSplits.size();
+            }
+            System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+        }
+        Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) 
<= Config.split_assigner_max_split_num_variance);
+
+    }
+
+    @Test
+    public void testGenerateRandomly() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        Random random = new Random();
+        int backendNum = random.nextInt(100 - 1) + 1;
+
+        int minOctet3 = 0;
+        int maxOctet3 = 250;
+        int minOctet4 = 1;
+        int maxOctet4 = 250;
+        Set<Integer> backendIds = new HashSet<>();
+        Set<String> ipAddresses = new HashSet<>();
+        for (int i = 0; i < backendNum; i++) {
+            String ipAddress;
+            do {
+                int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + 
minOctet3;
+                int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + 
minOctet4;
+                ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+            } while (!ipAddresses.add(ipAddress));
+
+            int backendId;
+            do {
+                backendId = random.nextInt(90000) + 10000;
+            } while (!backendIds.add(backendId));
+
+            Backend backend = new Backend(backendId, ipAddress, 9050);
+            backend.setAlive(true);
+            service.addBackend(backend);
+        }
+
+        List<Split> remoteSplits = new ArrayList<>();
+        int splitCount = random.nextInt(1000 - 100) + 100;
+        for (int i = 0; i < splitCount; ++i) {
+            long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, splitLength, splitLength, 0, null, 
Collections.emptyList());
+            remoteSplits.add(split);
+        }
+
+        List<Split> localSplits = new ArrayList<>();
+        int localSplitCount = random.nextInt(1000 - 100) + 100;
+        Set<String> totalLocalHosts = new HashSet<>();
+        for (int i = 0; i < localSplitCount; ++i) {
+            int localHostNum = random.nextInt(3 - 1) + 1;
+            Set<String> localHosts = new HashSet<>();
+            String localHost;
+            for (int j = 0; j < localHostNum; ++j) {
+                do {
+                    localHost = 
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+                } while (!localHosts.add(localHost));
+                totalLocalHosts.add(localHost);
+            }
+            long localSplitLength = random.nextInt(115343360 - 94371840) + 
94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, localSplitLength, localSplitLength, 0, 
localHosts.toArray(new String[0]),
+                    Collections.emptyList());
+            localSplits.add(split);
+        }
+
+        ListMultimap<Backend, Split> result = null;
+        // Run 3 times to ensure the same results
+        for (int i = 0; i < 3; ++i) {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            int totalSplitNum = 0;
+
+            List<Split> totalSplits = new ArrayList<>();
+            totalSplits.addAll(remoteSplits);
+            totalSplits.addAll(localSplits);
+            Collections.shuffle(totalSplits);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
+            if (i == 0) {
+                result = ArrayListMultimap.create(assignment);
+            } else {
+                Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result, 
assignment));
+
+            }
+            int maxAssignedSplitNum = Integer.MIN_VALUE;
+            int minAssignedSplitNum = Integer.MAX_VALUE;
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                if (assignedSplits.size() <= minAssignedSplitNum) {
+                    minAssignedSplitNum = assignedSplits.size();
+                }
+                if (assignedSplits.size() >= maxAssignedSplitNum) {
+                    maxAssignedSplitNum = assignedSplits.size();
+                }
+
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    ++totalSplitNum;
+                    if (fileSplit.getHosts() != null && 
fileSplit.getHosts().length > 0) {
+                        for (String host : fileSplit.getHosts()) {
+                            Assert.assertTrue(totalLocalHosts.contains(host));
+                        }
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+            Assert.assertEquals(totalSplits.size(), totalSplitNum);
 
-        scanRangeLocations = getScanRangeLocations("path2", 0, 100);
-        Assertions.assertEquals(78, 
policy.getNextConsistentBe(scanRangeLocations).getId());
+            Assert.assertTrue(Math.abs(maxAssignedSplitNum - 
minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance);
+        }
     }
 
-    private TScanRangeLocations getScanRangeLocations(String path, long 
startOffset, long size) {
-        // Generate on file scan range
-        TFileScanRange fileScanRange = new TFileScanRange();
-        // Scan range
-        TExternalScanRange externalScanRange = new TExternalScanRange();
-        externalScanRange.setFileScanRange(fileScanRange);
-        TScanRange scanRange = new TScanRange();
-        scanRange.setExtScanRange(externalScanRange);
-        
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path,
 startOffset, size));
-        // Locations
-        TScanRangeLocations locations = new TScanRangeLocations();
-        locations.setScanRange(scanRange);
-        return locations;
+    @Test
+    public void testNonAliveNodes() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        Random random = new Random();
+        int backendNum = random.nextInt(100 - 1) + 1;
+
+        int minOctet3 = 0;
+        int maxOctet3 = 250;
+        int minOctet4 = 1;
+        int maxOctet4 = 250;
+        Set<Integer> backendIds = new HashSet<>();
+        Set<String> ipAddresses = new HashSet<>();
+        int aliveBackendNum = 0;
+        for (int i = 0; i < backendNum; i++) {
+            String ipAddress;
+            do {
+                int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + 
minOctet3;
+                int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + 
minOctet4;
+                ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+            } while (!ipAddresses.add(ipAddress));
+
+            int backendId;
+            do {
+                backendId = random.nextInt(90000) + 10000;
+            } while (!backendIds.add(backendId));
+
+            Backend backend = new Backend(backendId, ipAddress, 9050);
+            if (i % 2 == 0) {
+                ++aliveBackendNum;
+                backend.setAlive(true);
+            } else {
+                backend.setAlive(false);
+            }
+            service.addBackend(backend);
+        }
+
+        List<Split> remoteSplits = new ArrayList<>();
+        int splitCount = random.nextInt(1000 - 100) + 100;
+        for (int i = 0; i < splitCount; ++i) {
+            long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, splitLength, splitLength, 0, null, 
Collections.emptyList());
+            remoteSplits.add(split);
+        }
+
+        List<Split> localSplits = new ArrayList<>();
+        int localSplitCount = random.nextInt(1000 - 100) + 100;
+        Set<String> totalLocalHosts = new HashSet<>();
+        for (int i = 0; i < localSplitCount; ++i) {
+            int localHostNum = random.nextInt(3 - 1) + 1;
+            Set<String> localHosts = new HashSet<>();
+            String localHost;
+            for (int j = 0; j < localHostNum; ++j) {
+                do {
+                    localHost = 
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+                } while (!localHosts.add(localHost));
+                totalLocalHosts.add(localHost);
+            }
+            long localSplitLength = random.nextInt(115343360 - 94371840) + 
94371840;
+            FileSplit split = new FileSplit(new Path(
+                    "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" 
+ UUID.randomUUID()),
+                    0, localSplitLength, localSplitLength, 0, 
localHosts.toArray(new String[0]),
+                    Collections.emptyList());
+            localSplits.add(split);
+        }
+
+        Multimap<Backend, Split> result = null;
+        // Run 3 times to ensure the same results
+        for (int i = 0; i < 3; ++i) {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            Assertions.assertEquals(policy.numBackends(), aliveBackendNum);
+            int totalSplitNum = 0;
+            List<Split> totalSplits = new ArrayList<>();
+            totalSplits.addAll(remoteSplits);
+            totalSplits.addAll(localSplits);
+            Collections.shuffle(totalSplits);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(totalSplits);
+            if (i == 0) {
+                result = ArrayListMultimap.create(assignment);
+            } else {
+                Assertions.assertEquals(result, assignment);
+            }
+            int maxAssignedSplitNum = Integer.MIN_VALUE;
+            int minAssignedSplitNum = Integer.MAX_VALUE;
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                if (assignedSplits.size() <= minAssignedSplitNum) {
+                    minAssignedSplitNum = assignedSplits.size();
+                }
+                if (assignedSplits.size() >= maxAssignedSplitNum) {
+                    maxAssignedSplitNum = assignedSplits.size();
+                }
+
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    ++totalSplitNum;
+                    if (fileSplit.getHosts() != null && 
fileSplit.getHosts().length > 0) {
+                        for (String host : fileSplit.getHosts()) {
+                            Assert.assertTrue(totalLocalHosts.contains(host));
+                        }
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+            Assert.assertEquals(totalSplits.size(), totalSplitNum);
+
+            Assert.assertTrue(Math.abs(maxAssignedSplitNum - 
minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance);
+        }
     }
 
-    private TFileRangeDesc createRangeDesc(String path, long startOffset, long 
size) {
-        TFileRangeDesc rangeDesc = new TFileRangeDesc();
-        rangeDesc.setPath(path);
-        rangeDesc.setStartOffset(startOffset);
-        rangeDesc.setSize(size);
-        return rangeDesc;
+    private static class TestSplitHashKey {
+        private String path;
+        private long start;
+        private long length;
+
+        public TestSplitHashKey(String path, long start, long length) {
+            this.path = path;
+            this.start = start;
+            this.length = length;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public long getStart() {
+            return start;
+        }
+
+        public long getLength() {
+            return length;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            TestSplitHashKey that = (TestSplitHashKey) o;
+            return start == that.start && length == that.length && 
Objects.equals(path, that.path);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(path, start, length);
+        }
+    }
+
+    @Test
+    public void testConsistentHashWhenNodeChanged() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 112140970, 112140970, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 120839661, 120839661, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 108897409, 108897409, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 95795997, 95795997, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 104600402, 104600402, 0, null, Collections.emptyList()));
+        splits.add(new FileSplit(new Path(
+                
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+                0, 105664025, 105664025, 0, null, Collections.emptyList()));
+
+        Map<TestSplitHashKey, Backend> originSplitAssignedBackends = new 
HashMap<>();
+            {
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            // Set these options to ensure that the consistent hash algorithm 
is consistent.
+            policy.setEnableSplitsRedistribution(false);
+            Config.split_assigner_min_consistent_hash_candidate_num = 1;
+            int backendNum = 3;
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    originSplitAssignedBackends.put(
+                            new TestSplitHashKey(split.getPathString(), 
split.getStart(), split.getLength()), backend);
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+            }
+            }
+
+            // remove a node
+            {
+            service.dropBackend(backend3.getId());
+            int changed = 0;
+
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            int backendNum = 2;
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    Backend origin = originSplitAssignedBackends.get(
+                            new TestSplitHashKey(split.getPathString(), 
split.getStart(), split.getLength()));
+                    if (!backend.equals(origin)) {
+                        changed += 1;
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+            }
+
+            float moveRatio = changed * 1.0f / assignment.values().size();
+            System.out.printf("Remove a node: move ratio = %.2f\n", moveRatio);
+            Assertions.assertEquals(0.375, moveRatio);
+            }
+
+            // add a node
+            {
+            Backend backend4 = new Backend(10004L, "172.30.0.128", 9050);
+            backend4.setAlive(true);
+            service.addBackend(backend4);
+            int changed = 0;
+
+            FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+            policy.init();
+            int backendNum = 3;
+            Assertions.assertEquals(policy.numBackends(), backendNum);
+            Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+
+            for (Backend backend : assignment.keySet()) {
+                Collection<Split> assignedSplits = assignment.get(backend);
+                long scanBytes = 0L;
+                for (Split split : assignedSplits) {
+                    FileSplit fileSplit = (FileSplit) split;
+                    scanBytes += fileSplit.getLength();
+                    Backend origin = originSplitAssignedBackends.get(
+                            new TestSplitHashKey(split.getPathString(), 
split.getStart(), split.getLength()));
+                    if (!backend.equals(origin)) {
+                        changed += 1;
+                    }
+                }
+                System.out.printf("%s -> %d splits, %d bytes\n", backend, 
assignedSplits.size(), scanBytes);
+            }
+            Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+            for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+                System.out.printf("weight: %s -> %d\n", entry.getKey(), 
entry.getValue());
+            }
+
+            float moveRatio = changed * 1.0f / assignment.values().size();
+            System.out.printf("Add a node, move ratio = %.2f\n", moveRatio);
+            Assertions.assertEquals(0.25, moveRatio);
+            }
+    }
+
+    private static <K, V> boolean areMultimapsEqualIgnoringOrder(
+            Multimap<K, V> multimap1, Multimap<K, V> multimap2) {
+        Collection<Map.Entry<K, V>> entries1 = multimap1.entries();
+        Collection<Map.Entry<K, V>> entries2 = multimap2.entries();
+
+        return entries1.containsAll(entries2) && 
entries2.containsAll(entries1);
     }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to