This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 383850ef12b07b9323276314b26ec6da7e792e86 Author: Qi Chen <[email protected]> AuthorDate: Sun Feb 4 11:13:29 2024 +0800 [Opt](multi-catalog) Opt split assignment to resolve uneven distribution. (#30390) [Opt] (multi-catalog) Opt split assignment to resolve uneven distribution. Currently only for `FileQueryScanNode`. Referring to the implementation of Trino, - Local node soft affinity optimization. Prefer local replication node. - Remote split will use the consistent hash algorithm is used when the file cache is turned on, and because of the possible unevenness of the consistent hash, the split is re-adjusted so that the maximum and minimum split numbers of hosts differ by at most `max_split_num_variance` split. - Remote split will use the round-robin algorithm is used when the file cache is turned off. --- .../main/java/org/apache/doris/common/Config.java | 23 +- .../apache/doris/common/IndexedPriorityQueue.java | 228 +++++++ .../doris/{spi/Split.java => common/Queue.java} | 21 +- .../doris/common/ResettableRandomizedIterator.java | 63 ++ .../UpdateablePriorityQueue.java} | 18 +- .../apache/doris/common/util/ConsistentHash.java | 39 +- .../doris/planner/external/ExternalScanNode.java | 6 +- .../planner/external/FederationBackendPolicy.java | 361 +++++++++- .../doris/planner/external/FileQueryScanNode.java | 129 ++-- .../apache/doris/planner/external/FileSplit.java | 7 + .../external/NodeSelectionStrategy.java} | 16 +- .../apache/doris/planner/external/SplitWeight.java | 130 ++++ .../src/main/java/org/apache/doris/spi/Split.java | 23 + .../main/java/org/apache/doris/system/Backend.java | 2 +- .../doris/planner/FederationBackendPolicyTest.java | 730 ++++++++++++++++++--- 15 files changed, 1587 insertions(+), 209 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 4fcd815072e..be58c139852 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2239,7 +2239,28 @@ public class Config extends ConfigBase { "When file cache is enabled, the number of virtual nodes of each node in the consistent hash algorithm. " + "The larger the value, the more uniform the distribution of the hash algorithm, " + "but it will increase the memory overhead."}) - public static int virtual_node_number = 2048; + public static int split_assigner_virtual_node_number = 256; + + @ConfField(mutable = true, description = { + "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。", + "Local node soft affinity optimization. Prefer local replication node."}) + public static boolean split_assigner_optimized_local_scheduling = true; + + @ConfField(mutable = true, description = { + "随机算法最小的候选数目,会选取相对最空闲的节点。", + "The random algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_random_candidate_num = 2; + + @ConfField(mutable = true, description = { + "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。", + "The consistent hash algorithm has the smallest number of candidates and will select the most idle node."}) + public static int split_assigner_min_consistent_hash_candidate_num = 2; + + @ConfField(mutable = true, description = { + "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。", + "The maximum difference in the number of splits between nodes. " + + "If this number is exceeded, the splits will be redistributed."}) + public static int split_assigner_max_split_num_variance = 1; @ConfField(description = { "控制统计信息的自动触发作业执行记录的持久化行数", diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java new file mode 100644 index 00000000000..f93db510e75 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/IndexedPriorityQueue.java @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java +// and modified by Doris + +package org.apache.doris.common; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * A priority queue with constant time contains(E) and log time remove(E) + * Ties are broken by insertion order + */ +public final class IndexedPriorityQueue<E> + implements UpdateablePriorityQueue<E> { + private final Map<E, Entry<E>> index = new HashMap<>(); + private final Set<Entry<E>> queue; + private long generation; + + public IndexedPriorityQueue() { + this(PriorityOrdering.HIGH_TO_LOW); + } + + public IndexedPriorityQueue(PriorityOrdering priorityOrdering) { + switch (priorityOrdering) { + case LOW_TO_HIGH: + queue = new TreeSet<>( + Comparator.comparingLong((Entry<E> entry) -> entry.getPriority()) + .thenComparingLong(Entry::getGeneration)); + break; + case HIGH_TO_LOW: + queue = new TreeSet<>((entry1, entry2) -> { + int priorityComparison = Long.compare(entry2.getPriority(), entry1.getPriority()); + if (priorityComparison != 0) { + return priorityComparison; + } + return Long.compare(entry1.getGeneration(), entry2.getGeneration()); + }); + break; + default: + throw new IllegalArgumentException(); + } + } + + @Override + public boolean addOrUpdate(E element, long priority) { + Entry<E> entry = index.get(element); + if (entry != null) { + if (entry.getPriority() == priority) { + return false; + } + queue.remove(entry); + Entry<E> newEntry = new Entry<>(element, priority, entry.getGeneration()); + queue.add(newEntry); + index.put(element, newEntry); + return false; + } + Entry<E> newEntry = new Entry<>(element, priority, generation); + generation++; + queue.add(newEntry); + index.put(element, newEntry); + return true; + } + + @Override + public boolean contains(E element) { + return index.containsKey(element); + } + + @Override + public boolean remove(E element) { + Entry<E> entry = index.remove(element); + if (entry != null) { + queue.remove(entry); + return true; + } + return false; + } + + @Override + public E poll() { + Entry<E> entry = pollEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public E peek() { + Entry<E> entry = peekEntry(); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + public Prioritized<E> getPrioritized(E element) { + Entry<E> entry = index.get(element); + if (entry == null) { + return null; + } + + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Prioritized<E> pollPrioritized() { + Entry<E> entry = pollEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + private Entry<E> pollEntry() { + Iterator<Entry<E>> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + Entry<E> entry = iterator.next(); + iterator.remove(); + Preconditions.checkState(index.remove(entry.getValue()) != null, "Failed to remove entry from index"); + return entry; + } + + public Prioritized<E> peekPrioritized() { + Entry<E> entry = peekEntry(); + if (entry == null) { + return null; + } + return new Prioritized<>(entry.getValue(), entry.getPriority()); + } + + public Entry<E> peekEntry() { + Iterator<Entry<E>> iterator = queue.iterator(); + if (!iterator.hasNext()) { + return null; + } + return iterator.next(); + } + + @Override + public Iterator<E> iterator() { + return Iterators.transform(queue.iterator(), Entry::getValue); + } + + public enum PriorityOrdering { + LOW_TO_HIGH, + HIGH_TO_LOW + } + + private static final class Entry<E> { + private final E value; + private final long priority; + private final long generation; + + private Entry(E value, long priority, long generation) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + this.generation = generation; + } + + public E getValue() { + return value; + } + + public long getPriority() { + return priority; + } + + public long getGeneration() { + return generation; + } + } + + public static class Prioritized<V> { + private final V value; + private final long priority; + + public Prioritized(V value, long priority) { + this.value = Objects.requireNonNull(value, "value is null"); + this.priority = priority; + } + + public V getValue() { + return value; + } + + public long getPriority() { + return priority; + } + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java similarity index 68% copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java copy to fe/fe-core/src/main/java/org/apache/doris/common/Queue.java index 31b1e1515a5..6dc58a35231 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Queue.java @@ -14,16 +14,23 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java +// and modified by Doris -package org.apache.doris.spi; +package org.apache.doris.common; -/** - * Split interface. e.g. Tablet for Olap Table. - */ -public interface Split { +interface Queue<E> { + boolean contains(E element); - String[] getHosts(); + boolean remove(E element); - Object getInfo(); + E poll(); + E peek(); + + int size(); + + boolean isEmpty(); } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java new file mode 100644 index 00000000000..11d18604356 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ResettableRandomizedIterator.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java +// and modified by Doris + +package org.apache.doris.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.ThreadLocalRandom; + +public class ResettableRandomizedIterator<T> + implements Iterator<T> { + private final List<T> list; + private int position; + + public ResettableRandomizedIterator(Collection<T> elements) { + this.list = new ArrayList<>(elements); + } + + public void reset() { + position = 0; + } + + @Override + public boolean hasNext() { + return position < list.size(); + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + int position = ThreadLocalRandom.current().nextInt(this.position, list.size()); + + T result = list.set(position, list.get(this.position)); + list.set(this.position, result); + this.position++; + + return result; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java similarity index 69% copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java copy to fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java index 31b1e1515a5..c9c0a2b3deb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/UpdateablePriorityQueue.java @@ -14,16 +14,14 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/UpdateablePriorityQueue.java +// and modified by Doris -package org.apache.doris.spi; - -/** - * Split interface. e.g. Tablet for Olap Table. - */ -public interface Split { - - String[] getHosts(); - - Object getInfo(); +package org.apache.doris.common; +interface UpdateablePriorityQueue<E> + extends Queue<E>, Iterable<E> { + boolean addOrUpdate(E element, long priority); } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java index 4ef6e4168f1..85199ad32fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java @@ -17,11 +17,16 @@ package org.apache.doris.common.util; +import com.google.common.collect.ImmutableList; import com.google.common.hash.Funnel; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -89,14 +94,38 @@ public class ConsistentHash<K, N> { } } - public N getNode(K key) { - if (ring.isEmpty()) { - return null; + public List<N> getNode(K key, int count) { + int nodeCount = ring.values().size(); + if (count > nodeCount) { + count = nodeCount; } + + Set<N> uniqueNodes = new LinkedHashSet<>(); + Hasher hasher = hashFunction.newHasher(); Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong(); + SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey); - hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey(); - return ring.get(hashKey).getNode(); + // Start reading from tail + for (Map.Entry<Long, VirtualNode> entry : tailMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + + if (uniqueNodes.size() < count) { + // Start reading from the head as we have exhausted tail + SortedMap<Long, VirtualNode> headMap = ring.headMap(hashKey); + for (Map.Entry<Long, VirtualNode> entry : headMap.entrySet()) { + uniqueNodes.add(entry.getValue().node); + if (uniqueNodes.size() == count) { + break; + } + } + } + + return ImmutableList.copyOf(uniqueNodes); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java index a4751b5205f..cbfe3188817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.UserException; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TScanRangeLocations; @@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode { // set to false means this scan node does not need to check column priv. protected boolean needCheckColumnPriv; - protected final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); + protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableFileCache) + ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode { return scanRangeLocations.size(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index ade03291c30..df0e955ea8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -14,28 +14,38 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// This file is referenced from +// https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelector.java +// and modified by Doris package org.apache.doris.planner.external; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.IndexedPriorityQueue; +import org.apache.doris.common.ResettableRandomizedIterator; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.system.SystemInfoService; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; @@ -45,12 +55,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -59,35 +73,47 @@ public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); private final List<Backend> backends = Lists.newArrayList(); private final Map<String, List<Backend>> backendMap = Maps.newHashMap(); - private final SecureRandom random = new SecureRandom(); - private ConsistentHash<TScanRangeLocations, Backend> consistentHash; + + public Map<Backend, Long> getAssignedWeightPerBackend() { + return assignedWeightPerBackend; + } + + private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap(); + + private ConsistentHash<Split, Backend> consistentHash; private int nextBe = 0; private boolean initialized = false; + private NodeSelectionStrategy nodeSelectionStrategy; + private boolean enableSplitsRedistribution = true; + // Create a ConsistentHash ring may be a time-consuming operation, so we cache it. - private static LoadingCache<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache; + private static LoadingCache<HashCacheKey, ConsistentHash<Split, Backend>> consistentHashCache; static { consistentHashCache = CacheBuilder.newBuilder().maximumSize(5) - .build(new CacheLoader<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>>() { + .build(new CacheLoader<HashCacheKey, ConsistentHash<Split, Backend>>() { @Override - public ConsistentHash<TScanRangeLocations, Backend> load(HashCacheKey key) { - return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), - new BackendHash(), key.bes, Config.virtual_node_number); + public ConsistentHash<Split, Backend> load(HashCacheKey key) { + return new ConsistentHash<>(Hashing.murmur3_128(), new SplitHash(), + new BackendHash(), key.bes, Config.split_assigner_virtual_node_number); } }); } private static class HashCacheKey { // sorted backend ids as key - private List<Long> beIds; + private List<String> beHashKeys; // backends is not part of key, just an attachment private List<Backend> bes; HashCacheKey(List<Backend> backends) { this.bes = backends; - this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList()); + this.beHashKeys = backends.stream().map(b -> + String.format("id: %d, host: %s, port: %d", b.getId(), b.getHost(), b.getHeartbeatPort())) + .sorted() + .collect(Collectors.toList()); } @Override @@ -98,20 +124,28 @@ public class FederationBackendPolicy { if (!(obj instanceof HashCacheKey)) { return false; } - return Objects.equals(beIds, ((HashCacheKey) obj).beIds); + return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys); } @Override public int hashCode() { - return Objects.hash(beIds); + return Objects.hash(beHashKeys); } @Override public String toString() { - return "HashCache{" + "beIds=" + beIds + '}'; + return "HashCache{" + "beHashKeys=" + beHashKeys + '}'; } } + public FederationBackendPolicy(NodeSelectionStrategy nodeSelectionStrategy) { + this.nodeSelectionStrategy = nodeSelectionStrategy; + } + + public FederationBackendPolicy() { + this(NodeSelectionStrategy.ROUND_ROBIN); + } + public void init() throws UserException { if (!initialized) { init(Collections.emptyList()); @@ -152,6 +186,10 @@ public class FederationBackendPolicy { if (backends.isEmpty()) { throw new UserException("No available backends"); } + for (Backend backend : backends) { + assignedWeightPerBackend.put(backend, 0L); + } + backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); try { consistentHash = consistentHashCache.get(new HashCacheKey(backends)); @@ -166,23 +204,280 @@ public class FederationBackendPolicy { return selectedBackend; } - public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { - return consistentHash.getNode(scanRangeLocations); + @VisibleForTesting + public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) { + this.enableSplitsRedistribution = enableSplitsRedistribution; + } + + public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { + // Sorting splits is to ensure that the same query utilizes the os page cache as much as possible. + splits.sort((split1, split2) -> { + int pathComparison = split1.getPathString().compareTo(split2.getPathString()); + if (pathComparison != 0) { + return pathComparison; + } + + int startComparison = Long.compare(split1.getStart(), split2.getStart()); + if (startComparison != 0) { + return startComparison; + } + return Long.compare(split1.getLength(), split2.getLength()); + }); + + ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); + + List<Split> remainingSplits = null; + + List<Backend> backends = new ArrayList<>(); + for (List<Backend> backendList : backendMap.values()) { + backends.addAll(backendList); + } + ResettableRandomizedIterator<Backend> randomCandidates = new ResettableRandomizedIterator<>(backends); + + boolean splitsToBeRedistributed = false; + + // optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain + // locality information + if (Config.split_assigner_optimized_local_scheduling) { + remainingSplits = new ArrayList<>(splits.size()); + for (int i = 0; i < splits.size(); ++i) { + Split split = splits.get(i); + if (split.isRemotelyAccessible() && (split.getHosts() != null && split.getHosts().length > 0)) { + List<Backend> candidateNodes = selectExactNodes(backendMap, split.getHosts()); + + Optional<Backend> chosenNode = candidateNodes.stream() + .min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode))); + + if (chosenNode.isPresent()) { + Backend selectedBackend = chosenNode.get(); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + splitsToBeRedistributed = true; + continue; + } + } + remainingSplits.add(split); + } + } else { + remainingSplits = splits; + } + + for (Split split : remainingSplits) { + List<Backend> candidateNodes; + if (!split.isRemotelyAccessible()) { + candidateNodes = selectExactNodes(backendMap, split.getHosts()); + } else { + switch (nodeSelectionStrategy) { + case ROUND_ROBIN: { + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); + candidateNodes = ImmutableList.of(selectedBackend); + break; + } + case RANDOM: { + randomCandidates.reset(); + candidateNodes = selectNodes(Config.split_assigner_min_random_candidate_num, randomCandidates); + break; + } + case CONSISTENT_HASHING: { + candidateNodes = consistentHash.getNode(split, + Config.split_assigner_min_consistent_hash_candidate_num); + splitsToBeRedistributed = true; + break; + } + default: { + throw new RuntimeException(); + } + } + } + if (candidateNodes.isEmpty()) { + LOG.debug("No nodes available to schedule {}. Available nodes {}", split, + backends); + throw new UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG); + } + + Backend selectedBackend = chooseNodeForSplit(candidateNodes); + List<Backend> alternativeBackends = new ArrayList<>(candidateNodes); + alternativeBackends.remove(selectedBackend); + split.setAlternativeHosts( + alternativeBackends.stream().map(each -> each.getHost()).collect(Collectors.toList())); + assignment.put(selectedBackend, split); + assignedWeightPerBackend.put(selectedBackend, + assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); + } + + if (enableSplitsRedistribution && splitsToBeRedistributed) { + equateDistribution(assignment); + } + return assignment; + } + + /** + * The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and + * a minHeap based on the number of splits that are assigned to them. Splits are redistributed, one at a time, + * from a maxNode to a minNode until we have as uniform a distribution as possible. + * + * @param assignment the node-splits multimap after the first and the second stage + */ + private void equateDistribution(ListMultimap<Backend, Split> assignment) { + if (assignment.isEmpty()) { + return; + } + + List<Backend> allNodes = new ArrayList<>(); + for (List<Backend> backendList : backendMap.values()) { + allNodes.addAll(backendList); + } + Collections.sort(allNodes, Comparator.comparing(Backend::getId)); + + if (allNodes.size() < 2) { + return; + } + + IndexedPriorityQueue<Backend> maxNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node)); + } + + IndexedPriorityQueue<Backend> minNodes = new IndexedPriorityQueue<>(); + for (Backend node : allNodes) { + minNodes.addOrUpdate(node, Long.MAX_VALUE - assignedWeightPerBackend.get(node)); + } + + while (true) { + if (maxNodes.isEmpty()) { + return; + } + + // fetch min and max node + Backend maxNode = maxNodes.poll(); + Backend minNode = minNodes.poll(); + + // Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution + // among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform + // distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes + // with data because of potential savings in network throughput and CPU time. + if (assignedWeightPerBackend.get(maxNode) - assignedWeightPerBackend.get(minNode) + <= SplitWeight.rawValueForStandardSplitCount(Config.split_assigner_max_split_num_variance)) { + return; + } + + // move split from max to min + Split redistributedSplit = redistributeSplit(assignment, maxNode, minNode); + + assignedWeightPerBackend.put(maxNode, + assignedWeightPerBackend.get(maxNode) - redistributedSplit.getSplitWeight().getRawValue()); + assignedWeightPerBackend.put(minNode, Math.addExact( + assignedWeightPerBackend.get(minNode), redistributedSplit.getSplitWeight().getRawValue())); + + // add max back into maxNodes only if it still has assignments + if (assignment.containsKey(maxNode)) { + maxNodes.addOrUpdate(maxNode, assignedWeightPerBackend.get(maxNode)); + } + + // Add or update both the Priority Queues with the updated node priorities + maxNodes.addOrUpdate(minNode, assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignedWeightPerBackend.get(minNode)); + minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignedWeightPerBackend.get(maxNode)); + } + } + + /** + * The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to + * redistribute a Non-local split if possible. This case is possible when there are multiple queries running + * simultaneously. If a Non-local split cannot be found in the maxNode, next split is selected and reassigned. + */ + @VisibleForTesting + public static Split redistributeSplit(Multimap<Backend, Split> assignment, Backend fromNode, + Backend toNode) { + Iterator<Split> splitIterator = assignment.get(fromNode).iterator(); + Split splitToBeRedistributed = null; + while (splitIterator.hasNext()) { + Split split = splitIterator.next(); + // Try to select non-local split for redistribution + if (split.getHosts() != null && !isSplitLocal( + split.getHosts(), fromNode.getHost())) { + splitToBeRedistributed = split; + break; + } + } + // Select split if maxNode has no non-local splits in the current batch of assignment + if (splitToBeRedistributed == null) { + splitIterator = assignment.get(fromNode).iterator(); + while (splitIterator.hasNext()) { + splitToBeRedistributed = splitIterator.next(); + // if toNode has split replication, transfer this split firstly + if (splitToBeRedistributed.getHosts() != null && isSplitLocal( + splitToBeRedistributed.getHosts(), toNode.getHost())) { + break; + } + // if toNode is split alternative host, transfer this split firstly + if (splitToBeRedistributed.getAlternativeHosts() != null && isSplitLocal( + splitToBeRedistributed.getAlternativeHosts(), toNode.getHost())) { + break; + } + } + } + splitIterator.remove(); + assignment.put(toNode, splitToBeRedistributed); + return splitToBeRedistributed; } - // Try to find a local BE, if not exists, use `getNextBe` instead - public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations scanRangeLocations) { - List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); + private static boolean isSplitLocal(String[] splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + private static boolean isSplitLocal(List<String> splitHosts, String host) { + for (String splitHost : splitHosts) { + if (splitHost.equals(host)) { + return true; + } + } + return false; + } + + public static List<Backend> selectExactNodes(Map<String, List<Backend>> backendMap, String[] hosts) { + Set<Backend> chosen = new LinkedHashSet<>(); + for (String host : hosts) { - List<Backend> backends = backendMap.get(host); - if (CollectionUtils.isNotEmpty(backends)) { - candidateBackends.add(backends.get(random.nextInt(backends.size()))); + if (backendMap.containsKey(host)) { + backendMap.get(host).stream() + .forEach(chosen::add); } } + return ImmutableList.copyOf(chosen); + } - return CollectionUtils.isEmpty(candidateBackends) - ? getNextConsistentBe(scanRangeLocations) - : candidateBackends.get(random.nextInt(candidateBackends.size())); + public static List<Backend> selectNodes(int limit, Iterator<Backend> candidates) { + Preconditions.checkArgument(limit > 0, "limit must be at least 1"); + + List<Backend> selected = new ArrayList<>(limit); + while (selected.size() < limit && candidates.hasNext()) { + selected.add(candidates.next()); + } + + return selected; + } + + private Backend chooseNodeForSplit(List<Backend> candidateNodes) { + Backend chosenNode = null; + long minWeight = Long.MAX_VALUE; + + for (Backend node : candidateNodes) { + long queuedWeight = assignedWeightPerBackend.get(node); + if (queuedWeight <= minWeight) { + chosenNode = node; + minWeight = queuedWeight; + } + } + + return chosenNode; } public int numBackends() { @@ -200,15 +495,13 @@ public class FederationBackendPolicy { } } - private static class ScanRangeHash implements Funnel<TScanRangeLocations> { + private static class SplitHash implements Funnel<Split> { @Override - public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) { - Preconditions.checkState(scanRange.scan_range.isSetExtScanRange()); - for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) { - primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8)); - primitiveSink.putLong(desc.start_offset); - primitiveSink.putLong(desc.size); - } + public void funnel(Split split, PrimitiveSink primitiveSink) { + primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putLong(split.getStart()); + primitiveSink.putLong(split.getLength()); } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 4a3c3d2ff9a..a374df5e602 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -68,13 +68,14 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -314,76 +315,73 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setProperties(locationProperties); } - boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List<String> pathPartitionKeys = getPathPartitionKeys(); - for (Split split : inputSplits) { - FileSplit fileSplit = (FileSplit) split; - TFileType locationType; - if (fileSplit instanceof IcebergSplit - && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { - locationType = TFileType.FILE_BROKER; - } else { - locationType = getLocationType(fileSplit.getPath().toString()); - } - TScanRangeLocations curLocations = newLocations(); - // If fileSplit has partition values, use the values collected from hive partitions. - // Otherwise, use the values in file path. - boolean isACID = false; - if (fileSplit instanceof HiveSplit) { - HiveSplit hiveSplit = (HiveSplit) split; - isACID = hiveSplit.isACID(); - } - List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null - ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) - : fileSplit.getPartitionValues(); - - TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, - locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - rangeDesc.setCompressType(fileCompressType); - if (isACID) { - HiveSplit hiveSplit = (HiveSplit) split; - hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); - AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); - TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); - transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); - List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>(); - for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { - TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); - deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); - deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); - deleteDeltaDescs.add(deleteDeltaDesc); + Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits); + for (Backend backend : assignment.keySet()) { + Collection<Split> splits = assignment.get(backend); + for (Split split : splits) { + FileSplit fileSplit = (FileSplit) split; + TFileType locationType; + if (fileSplit instanceof IcebergSplit + && ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + locationType = TFileType.FILE_BROKER; + } else { + locationType = getLocationType(fileSplit.getPath().toString()); + } + + TScanRangeLocations curLocations = newLocations(); + // If fileSplit has partition values, use the values collected from hive partitions. + // Otherwise, use the values in file path. + boolean isACID = false; + if (fileSplit instanceof HiveSplit) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + isACID = hiveSplit.isACID(); + } + List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null + ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, + false, isACID) : fileSplit.getPartitionValues(); + + TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, + locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); + if (isACID) { + HiveSplit hiveSplit = (HiveSplit) fileSplit; + hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value()); + AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo(); + TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc(); + transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation()); + List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>(); + for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) { + TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc(); + deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation()); + deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames()); + deleteDeltaDescs.add(deleteDeltaDesc); + } + transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); + tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); } - transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); - tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - setScanParams(rangeDesc, fileSplit); - - curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - TScanRangeLocation location = new TScanRangeLocation(); - Backend selectedBackend; - if (enableShortCircuitRead) { - // Try to find a local BE if enable hdfs short circuit read - selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations); - } else { - // Use consistent hash to assign the same scan range into the same backend among different queries - selectedBackend = backendPolicy.getNextConsistentBe(curLocations); + setScanParams(rangeDesc, fileSplit); + + curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); + TScanRangeLocation location = new TScanRangeLocation(); + setLocationPropertiesIfNecessary(backend, locationType, locationProperties); + location.setBackendId(backend.getId()); + location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort())); + curLocations.addToLocations(location); + LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", + curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), + fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); + scanRangeLocations.add(curLocations); + this.totalFileSize += fileSplit.getLength(); } - setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); - location.setBackendId(selectedBackend.getId()); - location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); - curLocations.addToLocations(location); - LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}", - curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(), - fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); - scanRangeLocations.add(curLocations); - this.totalFileSize += fileSplit.getLength(); } + if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime(); } @@ -518,3 +516,4 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract Map<String, String> getLocationProperties() throws UserException; } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java index 4e9d0bae564..1221d3ad21b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java @@ -42,6 +42,8 @@ public class FileSplit implements Split { // partitionValues would be ["part1", "part2"] protected List<String> partitionValues; + protected List<String> alternativeHosts; + public FileSplit(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts, List<String> partitionValues) { this.path = path; @@ -67,6 +69,11 @@ public class FileSplit implements Split { return null; } + @Override + public String getPathString() { + return path.toString(); + } + public static class FileSplitCreator implements SplitCreator { public static final FileSplitCreator DEFAULT = new FileSplitCreator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java similarity index 83% copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java copy to fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java index 31b1e1515a5..4b4d10a4240 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java @@ -15,15 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.spi; - -/** - * Split interface. e.g. Tablet for Olap Table. - */ -public interface Split { - - String[] getHosts(); - - Object getInfo(); +package org.apache.doris.planner.external; +public enum NodeSelectionStrategy { + ROUND_ROBIN, + RANDOM, + CONSISTENT_HASHING } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java new file mode 100644 index 00000000000..d8724017b67 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java +// and modified by Doris + +package org.apache.doris.planner.external; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.errorprone.annotations.DoNotCall; + +import java.math.BigDecimal; +import java.util.Collection; +import java.util.function.Function; + +public final class SplitWeight { + private static final long UNIT_VALUE = 100; + private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE + private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE); + + private final long value; + + private SplitWeight(long value) { + if (value <= 0) { + throw new IllegalArgumentException("value must be > 0, found: " + value); + } + this.value = value; + } + + /** + * Produces a {@link SplitWeight} from the raw internal value representation. This method is intended + * primarily for JSON deserialization, and connectors should use not call this factory method directly + * to construct {@link SplitWeight} instances. Instead, connectors should use + * {@link SplitWeight#fromProportion(double)} + * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} changes in the future. + */ + @JsonCreator + @DoNotCall // For JSON serialization only + public static SplitWeight fromRawValue(long value) { + return fromRawValueInternal(value); + } + + /** + * Produces a {@link SplitWeight} that corresponds to the {@link SplitWeight#standard()} weight + * proportionally, i.e., a parameter of <code>1.0</code> will be equivalent to the standard weight + * and a value of <code>0.5</code> will be 1/2 of the standard split weight. Valid arguments + * must be greater than zero and finite. Connectors should prefer constructing split weights + * using this factory method rather than passing a raw integer value in case the integer representation + * of a standard split needs to change in the future. + * + * @param weight the proportional weight relative to a standard split, expressed as a double + * @return a {@link SplitWeight} with a raw value corresponding to the requested proportion + */ + public static SplitWeight fromProportion(double weight) { + if (weight <= 0 || !Double.isFinite(weight)) { + throw new IllegalArgumentException("Invalid weight: " + weight); + } + // Must round up to avoid small weights rounding to 0 + return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE)); + } + + private static SplitWeight fromRawValueInternal(long value) { + return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value); + } + + public static SplitWeight standard() { + return STANDARD_WEIGHT; + } + + public static long rawValueForStandardSplitCount(int splitCount) { + if (splitCount < 0) { + throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount); + } + return Math.multiplyExact(splitCount, UNIT_VALUE); + } + + public static <T> long rawValueSum(Collection<T> collection, Function<T, SplitWeight> getter) { + long sum = 0; + for (T item : collection) { + long value = getter.apply(item).getRawValue(); + sum = Math.addExact(sum, value); + } + return sum; + } + + /** + * @return The internal integer representation for this weight value + */ + @JsonValue + public long getRawValue() { + return value; + } + + @Override + public int hashCode() { + return Long.hashCode(value); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SplitWeight)) { + return false; + } + return this.value == ((SplitWeight) other).value; + } + + @Override + public String toString() { + if (value == UNIT_VALUE) { + return "1"; + } + return BigDecimal.valueOf(value, -UNIT_SCALE).stripTrailingZeros().toPlainString(); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java index 31b1e1515a5..d42defe5c38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java +++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java @@ -17,6 +17,10 @@ package org.apache.doris.spi; +import org.apache.doris.planner.external.SplitWeight; + +import java.util.List; + /** * Split interface. e.g. Tablet for Olap Table. */ @@ -26,4 +30,23 @@ public interface Split { Object getInfo(); + default SplitWeight getSplitWeight() { + return SplitWeight.standard(); + } + + default boolean isRemotelyAccessible() { + return true; + } + + String getPathString(); + + long getStart(); + + long getLength(); + + List<String> getAlternativeHosts(); + + void setAlternativeHosts(List<String> alternativeHosts); + } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index fcb5e63e838..47d9619b079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -622,7 +622,7 @@ public class Backend implements Writable { @Override public int hashCode() { - return Objects.hash(id, host, heartbeatPort, bePort, isAlive); + return Objects.hash(id, host, heartbeatPort, bePort, isAlive.get()); } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index ef65d1b6165..1d213c6a09b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -18,52 +18,55 @@ package org.apache.doris.planner; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.planner.external.FileSplit; +import org.apache.doris.planner.external.NodeSelectionStrategy; +import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TExternalScanRange; -import org.apache.doris.thrift.TFileRangeDesc; -import org.apache.doris.thrift.TFileScanRange; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimap; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.junit.Before; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; import org.junit.Test; import org.junit.jupiter.api.Assertions; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; public class FederationBackendPolicyTest { @Mocked private Env env; - @Before - public void setUp() { - + @Test + public void testRemoteSplits() throws UserException { SystemInfoService service = new SystemInfoService(); - for (int i = 0; i < 190; i++) { - Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i, 9050); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1." + i, 9051); - backend.setAlive(true); - service.addBackend(backend); - } - for (int i = 0; i < 10; i++) { - Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2." + i, 9050); - backend.setAlive(false); - service.addBackend(backend); - } + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); new MockUp<Env>() { @Mock @@ -72,76 +75,653 @@ public class FederationBackendPolicyTest { } }; - } + List<Split> splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); - @Test - public void testGetNextBe() throws UserException { FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2.")); + + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); } - sw.stop(); - System.out.println("Invoke getNextBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); } @Test - public void testGetNextLocalBe() throws UserException { + public void testHasLocalSplits() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(30002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(30003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(30004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List<Split> splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, new String[] {"172.30.0.100"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, new String[] {"172.30.0.106"}, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + FederationBackendPolicy policy = new FederationBackendPolicy(); policy.init(); - int backendNum = 200; - int invokeTimes = 1000000; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2"); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Stopwatch sw = Stopwatch.createStarted(); - for (int i = 0; i < invokeTimes; i++) { - Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost())); - } - sw.stop(); - System.out.println("Invoke getNextLocalBe() " + invokeTimes - + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); + int totalSplitNum = 0; + List<Boolean> checkedLocalSplit = new ArrayList<>(); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + ++totalSplitNum; + if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.100", backend.getHost()); + checkedLocalSplit.add(true); + } else if (fileSplit.getPath().equals(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"))) { + Assert.assertEquals("172.30.0.106", backend.getHost()); + checkedLocalSplit.add(true); + } + } + } + Assert.assertEquals(2, checkedLocalSplit.size()); + Assert.assertEquals(8, totalSplitNum); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } @Test public void testConsistentHash() throws UserException { - FederationBackendPolicy policy = new FederationBackendPolicy(); + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List<Split> splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); policy.init(); - int backendNum = 200; + int backendNum = 3; Assertions.assertEquals(policy.numBackends(), backendNum); - TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); - Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId()); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + } + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + + } + + @Test + public void testGenerateRandomly() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set<Integer> backendIds = new HashSet<>(); + Set<String> ipAddresses = new HashSet<>(); + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + backend.setAlive(true); + service.addBackend(backend); + } + + List<Split> remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List<Split> localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set<String> totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set<String> localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + ListMultimap<Backend, Split> result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), backendNum); + int totalSplitNum = 0; + + List<Split> totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result, assignment)); + + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); - scanRangeLocations = getScanRangeLocations("path2", 0, 100); - Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId()); + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } } - private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) { - // Generate on file scan range - TFileScanRange fileScanRange = new TFileScanRange(); - // Scan range - TExternalScanRange externalScanRange = new TExternalScanRange(); - externalScanRange.setFileScanRange(fileScanRange); - TScanRange scanRange = new TScanRange(); - scanRange.setExtScanRange(externalScanRange); - scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size)); - // Locations - TScanRangeLocations locations = new TScanRangeLocations(); - locations.setScanRange(scanRange); - return locations; + @Test + public void testNonAliveNodes() throws UserException { + SystemInfoService service = new SystemInfoService(); + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + Random random = new Random(); + int backendNum = random.nextInt(100 - 1) + 1; + + int minOctet3 = 0; + int maxOctet3 = 250; + int minOctet4 = 1; + int maxOctet4 = 250; + Set<Integer> backendIds = new HashSet<>(); + Set<String> ipAddresses = new HashSet<>(); + int aliveBackendNum = 0; + for (int i = 0; i < backendNum; i++) { + String ipAddress; + do { + int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) + minOctet3; + int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) + minOctet4; + ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4; + } while (!ipAddresses.add(ipAddress)); + + int backendId; + do { + backendId = random.nextInt(90000) + 10000; + } while (!backendIds.add(backendId)); + + Backend backend = new Backend(backendId, ipAddress, 9050); + if (i % 2 == 0) { + ++aliveBackendNum; + backend.setAlive(true); + } else { + backend.setAlive(false); + } + service.addBackend(backend); + } + + List<Split> remoteSplits = new ArrayList<>(); + int splitCount = random.nextInt(1000 - 100) + 100; + for (int i = 0; i < splitCount; ++i) { + long splitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, splitLength, splitLength, 0, null, Collections.emptyList()); + remoteSplits.add(split); + } + + List<Split> localSplits = new ArrayList<>(); + int localSplitCount = random.nextInt(1000 - 100) + 100; + Set<String> totalLocalHosts = new HashSet<>(); + for (int i = 0; i < localSplitCount; ++i) { + int localHostNum = random.nextInt(3 - 1) + 1; + Set<String> localHosts = new HashSet<>(); + String localHost; + for (int j = 0; j < localHostNum; ++j) { + do { + localHost = service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost(); + } while (!localHosts.add(localHost)); + totalLocalHosts.add(localHost); + } + long localSplitLength = random.nextInt(115343360 - 94371840) + 94371840; + FileSplit split = new FileSplit(new Path( + "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/" + UUID.randomUUID()), + 0, localSplitLength, localSplitLength, 0, localHosts.toArray(new String[0]), + Collections.emptyList()); + localSplits.add(split); + } + + Multimap<Backend, Split> result = null; + // Run 3 times to ensure the same results + for (int i = 0; i < 3; ++i) { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + Assertions.assertEquals(policy.numBackends(), aliveBackendNum); + int totalSplitNum = 0; + List<Split> totalSplits = new ArrayList<>(); + totalSplits.addAll(remoteSplits); + totalSplits.addAll(localSplits); + Collections.shuffle(totalSplits); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(totalSplits); + if (i == 0) { + result = ArrayListMultimap.create(assignment); + } else { + Assertions.assertEquals(result, assignment); + } + int maxAssignedSplitNum = Integer.MIN_VALUE; + int minAssignedSplitNum = Integer.MAX_VALUE; + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + if (assignedSplits.size() <= minAssignedSplitNum) { + minAssignedSplitNum = assignedSplits.size(); + } + if (assignedSplits.size() >= maxAssignedSplitNum) { + maxAssignedSplitNum = assignedSplits.size(); + } + + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + ++totalSplitNum; + if (fileSplit.getHosts() != null && fileSplit.getHosts().length > 0) { + for (String host : fileSplit.getHosts()) { + Assert.assertTrue(totalLocalHosts.contains(host)); + } + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Assert.assertEquals(totalSplits.size(), totalSplitNum); + + Assert.assertTrue(Math.abs(maxAssignedSplitNum - minAssignedSplitNum) <= Config.split_assigner_max_split_num_variance); + } } - private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) { - TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setPath(path); - rangeDesc.setStartOffset(startOffset); - rangeDesc.setSize(size); - return rangeDesc; + private static class TestSplitHashKey { + private String path; + private long start; + private long length; + + public TestSplitHashKey(String path, long start, long length) { + this.path = path; + this.start = start; + this.length = length; + } + + public String getPath() { + return path; + } + + public long getStart() { + return start; + } + + public long getLength() { + return length; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestSplitHashKey that = (TestSplitHashKey) o; + return start == that.start && length == that.length && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(path, start, length); + } + } + + @Test + public void testConsistentHashWhenNodeChanged() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(10002L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(10003L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(10004L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp<Env>() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List<Split> splits = new ArrayList<>(); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 112140970, 112140970, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 120839661, 120839661, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 108897409, 108897409, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 95795997, 95795997, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 104600402, 104600402, 0, null, Collections.emptyList())); + splits.add(new FileSplit(new Path( + "hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00007-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"), + 0, 105664025, 105664025, 0, null, Collections.emptyList())); + + Map<TestSplitHashKey, Backend> originSplitAssignedBackends = new HashMap<>(); + { + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + // Set these options to ensure that the consistent hash algorithm is consistent. + policy.setEnableSplitsRedistribution(false); + Config.split_assigner_min_consistent_hash_candidate_num = 1; + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + originSplitAssignedBackends.put( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength()), backend); + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map<Backend, Long> stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry<Backend, Long> entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + } + + // remove a node + { + service.dropBackend(backend3.getId()); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 2; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + + Map<Backend, Long> stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry<Backend, Long> entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Remove a node: move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.375, moveRatio); + } + + // add a node + { + Backend backend4 = new Backend(10004L, "172.30.0.128", 9050); + backend4.setAlive(true); + service.addBackend(backend4); + int changed = 0; + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy.init(); + int backendNum = 3; + Assertions.assertEquals(policy.numBackends(), backendNum); + Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits); + + for (Backend backend : assignment.keySet()) { + Collection<Split> assignedSplits = assignment.get(backend); + long scanBytes = 0L; + for (Split split : assignedSplits) { + FileSplit fileSplit = (FileSplit) split; + scanBytes += fileSplit.getLength(); + Backend origin = originSplitAssignedBackends.get( + new TestSplitHashKey(split.getPathString(), split.getStart(), split.getLength())); + if (!backend.equals(origin)) { + changed += 1; + } + } + System.out.printf("%s -> %d splits, %d bytes\n", backend, assignedSplits.size(), scanBytes); + } + Map<Backend, Long> stats = policy.getAssignedWeightPerBackend(); + for (Map.Entry<Backend, Long> entry : stats.entrySet()) { + System.out.printf("weight: %s -> %d\n", entry.getKey(), entry.getValue()); + } + + float moveRatio = changed * 1.0f / assignment.values().size(); + System.out.printf("Add a node, move ratio = %.2f\n", moveRatio); + Assertions.assertEquals(0.25, moveRatio); + } + } + + private static <K, V> boolean areMultimapsEqualIgnoringOrder( + Multimap<K, V> multimap1, Multimap<K, V> multimap2) { + Collection<Map.Entry<K, V>> entries1 = multimap1.entries(); + Collection<Map.Entry<K, V>> entries2 = multimap2.entries(); + + return entries1.containsAll(entries2) && entries2.containsAll(entries1); } } + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
