This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpcds
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpcds by this push:
new fea246a7225 [Opt] Opt split assignment for external table. (#30246)
fea246a7225 is described below
commit fea246a7225e6ccb486ae6e93a5fd92b903b8575
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jan 23 12:15:37 2024 +0800
[Opt] Opt split assignment for external table. (#30246)
---
.../main/java/org/apache/doris/common/Config.java | 23 +-
.../apache/doris/common/util/ConsistentHash.java | 77 ++-
.../doris/planner/external/ExternalScanNode.java | 6 +-
.../planner/external/FederationBackendPolicy.java | 333 +++++++++-
.../doris/planner/external/FileQueryScanNode.java | 133 ++--
.../apache/doris/planner/external/FileSplit.java | 7 +
.../planner/external/IndexedPriorityQueue.java | 228 +++++++
.../external/NodeSelectionStrategy.java} | 15 +-
.../Split.java => planner/external/Queue.java} | 21 +-
.../external/ResettableRandomizedIterator.java | 63 ++
.../apache/doris/planner/external/SplitWeight.java | 130 ++++
.../external/UpdateablePriorityQueue.java} | 18 +-
.../src/main/java/org/apache/doris/spi/Split.java | 23 +
.../doris/planner/FederationBackendPolicyTest.java | 706 +++++++++++++++++++--
14 files changed, 1598 insertions(+), 185 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2494465e05e..6216b859c08 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2241,7 +2241,28 @@ public class Config extends ConfigBase {
"When file cache is enabled, the number of virtual nodes of each
node in the consistent hash algorithm. "
+ "The larger the value, the more uniform the distribution
of the hash algorithm, "
+ "but it will increase the memory overhead."})
- public static int virtual_node_number = 2048;
+ public static int virtual_node_number = 256;
+
+ @ConfField(mutable = true, description = {
+ "本地节点软亲缘性优化。尽可能地优先选取本地副本节点。",
+ "Local node soft affinity optimization. Prefer local replication
node."})
+ public static boolean optimized_local_scheduling = true;
+
+ @ConfField(mutable = true, description = {
+ "随机算法最小的候选数目,会选取相对最空闲的节点。",
+ "The random algorithm has the smallest number of candidates and
will select the most idle node."})
+ public static int min_random_candidate_num = 2;
+
+ @ConfField(mutable = true, description = {
+ "一致性哈希算法最小的候选数目,会选取相对最空闲的节点。",
+ "The consistent hash algorithm has the smallest number of
candidates and will select the most idle node."})
+ public static int min_consistent_hash_candidate_num = 2;
+
+ @ConfField(mutable = true, description = {
+ "各节点之间最大的 split 数目差异,如果超过这个数目就会重新分布 split。",
+ "The maximum difference in the number of splits between nodes. "
+ + "If this number is exceeded, the splits will be
redistributed."})
+ public static int max_split_num_variance = 1;
@ConfField(description = {
"控制统计信息的自动触发作业执行记录的持久化行数",
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
index 4ef6e4168f1..b627c6d0a4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ConsistentHash.java
@@ -17,11 +17,16 @@
package org.apache.doris.common.util;
+import com.google.common.collect.ImmutableList;
import com.google.common.hash.Funnel;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -89,14 +94,76 @@ public class ConsistentHash<K, N> {
}
}
- public N getNode(K key) {
- if (ring.isEmpty()) {
- return null;
+ public List<N> getNode(K key, int count) {
+ int nodeCount = ring.values().size();
+ if (count > nodeCount) {
+ count = nodeCount;
}
+
+ Set<N> uniqueNodes = new LinkedHashSet<>();
+
Hasher hasher = hashFunction.newHasher();
Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
+
SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
- hashKey = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
- return ring.get(hashKey).getNode();
+ // Start reading from tail
+ for (Map.Entry<Long, VirtualNode> entry : tailMap.entrySet()) {
+ uniqueNodes.add(entry.getValue().node);
+ if (uniqueNodes.size() == count) {
+ break;
+ }
+ }
+
+ if (uniqueNodes.size() < count) {
+ // Start reading from the head as we have exhausted tail
+ SortedMap<Long, VirtualNode> headMap = ring.headMap(hashKey);
+ for (Map.Entry<Long, VirtualNode> entry : headMap.entrySet()) {
+ uniqueNodes.add(entry.getValue().node);
+ if (uniqueNodes.size() == count) {
+ break;
+ }
+ }
+ }
+
+ return ImmutableList.copyOf(uniqueNodes);
}
+
+ // public List<N> getNode(K key, int distinctNumber) {
+ // if (ring.isEmpty()) {
+ // return null;
+ // }
+ // List<N> result = new ArrayList<>();
+ //
+ // Hasher hasher = hashFunction.newHasher();
+ // Long hashKey = hasher.putObject(key, keyFunnel).hash().asLong();
+ // // SortedMap<Long, VirtualNode> tailMap = ring.tailMap(hashKey);
+ // // hashKey = !tailMap.isEmpty() ? tailMap.firstKey() :
ring.firstKey();
+ // // return ring.get(hashKey).getNode();
+ //
+ //
+ // // search from `hash` to end.
+ // collectNodes(ring.tailMap(hashKey), result, distinctNumber);
+ // if (result.size() < distinctNumber) {
+ // // search from begin to end.
+ // // so we walk this ring twice at most.
+ // collectNodes(ring, result, distinctNumber);
+ // }
+ // return result;
+ // }
+ //
+ // private void collectNodes(SortedMap<Long, VirtualNode> map, List<N>
result, int distinctNumber) {
+ // Set<N> visited = new HashSet<>(result);
+ // for (Map.Entry<Long, VirtualNode> entry : map.entrySet()) {
+ // VirtualNode vnode = entry.getValue();
+ // if (visited.contains(vnode.node)) {
+ // continue;
+ // }
+ // visited.add(vnode.node);
+ // result.add(vnode.node);
+ // if (result.size() == distinctNumber) {
+ // break;
+ // }
+ // }
+ // }
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
index a4751b5205f..65681804875 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -43,7 +44,9 @@ public abstract class ExternalScanNode extends ScanNode {
// set to false means this scan node does not need to check column priv.
protected boolean needCheckColumnPriv;
- protected final FederationBackendPolicy backendPolicy = new
FederationBackendPolicy();
+ protected final FederationBackendPolicy backendPolicy = new
FederationBackendPolicy(
+ ConnectContext.get().getSessionVariable().enableFileCache
+ ? NodeSelectionStrategy.CONSISTENT_HASHING :
NodeSelectionStrategy.RANDOM);
public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
@@ -91,3 +94,4 @@ public abstract class ExternalScanNode extends ScanNode {
return scanRangeLocations.size();
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index ade03291c30..19321a57cbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -24,18 +24,23 @@ import org.apache.doris.common.util.ConsistentHash;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
+import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.system.SystemInfoService;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
@@ -45,12 +50,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -59,21 +68,29 @@ public class FederationBackendPolicy {
private static final Logger LOG =
LogManager.getLogger(FederationBackendPolicy.class);
private final List<Backend> backends = Lists.newArrayList();
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
- private final SecureRandom random = new SecureRandom();
- private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
+
+ public Map<Backend, Long> getAssignedWeightPerBackend() {
+ return assignedWeightPerBackend;
+ }
+
+ private Map<Backend, Long> assignedWeightPerBackend = Maps.newHashMap();
+
+ private ConsistentHash<Split, Backend> consistentHash;
private int nextBe = 0;
private boolean initialized = false;
+ private NodeSelectionStrategy nodeSelectionStrategy;
+
// Create a ConsistentHash ring may be a time-consuming operation, so we
cache it.
- private static LoadingCache<HashCacheKey,
ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
+ private static LoadingCache<HashCacheKey, ConsistentHash<Split, Backend>>
consistentHashCache;
static {
consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
- .build(new CacheLoader<HashCacheKey,
ConsistentHash<TScanRangeLocations, Backend>>() {
+ .build(new CacheLoader<HashCacheKey, ConsistentHash<Split,
Backend>>() {
@Override
- public ConsistentHash<TScanRangeLocations, Backend>
load(HashCacheKey key) {
- return new ConsistentHash<>(Hashing.murmur3_128(), new
ScanRangeHash(),
+ public ConsistentHash<Split, Backend> load(HashCacheKey
key) {
+ return new ConsistentHash<>(Hashing.murmur3_128(), new
SplitHash(),
new BackendHash(), key.bes,
Config.virtual_node_number);
}
});
@@ -81,13 +98,16 @@ public class FederationBackendPolicy {
private static class HashCacheKey {
// sorted backend ids as key
- private List<Long> beIds;
+ private List<String> beHashKeys;
// backends is not part of key, just an attachment
private List<Backend> bes;
HashCacheKey(List<Backend> backends) {
this.bes = backends;
- this.beIds = backends.stream().map(b ->
b.getId()).sorted().collect(Collectors.toList());
+ this.beHashKeys = backends.stream().map(b ->
+ String.format("id: %d, host: %s, port: %d",
b.getId(), b.getHost(), b.getHeartbeatPort()))
+ .sorted()
+ .collect(Collectors.toList());
}
@Override
@@ -98,20 +118,28 @@ public class FederationBackendPolicy {
if (!(obj instanceof HashCacheKey)) {
return false;
}
- return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
+ return Objects.equals(beHashKeys, ((HashCacheKey) obj).beHashKeys);
}
@Override
public int hashCode() {
- return Objects.hash(beIds);
+ return Objects.hash(beHashKeys);
}
@Override
public String toString() {
- return "HashCache{" + "beIds=" + beIds + '}';
+ return "HashCache{" + "beHashKeys=" + beHashKeys + '}';
}
}
+ public FederationBackendPolicy(NodeSelectionStrategy
nodeSelectionStrategy) {
+ this.nodeSelectionStrategy = nodeSelectionStrategy;
+ }
+
+ public FederationBackendPolicy() {
+ this(NodeSelectionStrategy.RANDOM);
+ }
+
public void init() throws UserException {
if (!initialized) {
init(Collections.emptyList());
@@ -152,6 +180,10 @@ public class FederationBackendPolicy {
if (backends.isEmpty()) {
throw new UserException("No available backends");
}
+ for (Backend backend : backends) {
+ assignedWeightPerBackend.put(backend, 0L);
+ }
+
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
try {
consistentHash = consistentHashCache.get(new
HashCacheKey(backends));
@@ -166,23 +198,260 @@ public class FederationBackendPolicy {
return selectedBackend;
}
- public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations)
{
- return consistentHash.getNode(scanRangeLocations);
+ public Multimap<Backend, Split> computeScanRangeAssignment(List<Split>
splits) throws UserException {
+ ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
+
+ List<Split> remainingSplits = null;
+
+ List<Backend> filteredNodes = new ArrayList<>();
+ for (List<Backend> backendList : backendMap.values()) {
+ filteredNodes.addAll(backendList);
+ }
+ ResettableRandomizedIterator<Backend> randomCandidates = new
ResettableRandomizedIterator<>(filteredNodes);
+
+ boolean splitsToBeRedistributed = false;
+
+ // optimizedLocalScheduling enables prioritized assignment of splits
to local nodes when splits contain
+ // locality information
+ if (Config.optimized_local_scheduling) {
+ remainingSplits = new ArrayList<>(splits.size());
+ for (int i = 0; i < splits.size(); ++i) {
+ Split split = splits.get(i);
+ if (split.isRemotelyAccessible() && (split.getHosts() != null
&& split.getHosts().length > 0)) {
+ List<Backend> candidateNodes =
selectExactNodes(backendMap, split.getHosts());
+
+ Optional<Backend> chosenNode = candidateNodes.stream()
+ .min(Comparator.comparingLong(ownerNode ->
assignedWeightPerBackend.get(ownerNode)));
+
+ if (chosenNode.isPresent()) {
+ Backend selectedBackend = chosenNode.get();
+ assignment.put(selectedBackend, split);
+ assignedWeightPerBackend.put(selectedBackend,
+ assignedWeightPerBackend.get(selectedBackend)
+ split.getSplitWeight().getRawValue());
+ splitsToBeRedistributed = true;
+ continue;
+ }
+ }
+ remainingSplits.add(split);
+ }
+ } else {
+ remainingSplits = splits;
+ }
+
+ for (Split split : remainingSplits) {
+ List<Backend> candidateNodes;
+ if (!split.isRemotelyAccessible()) {
+ candidateNodes = selectExactNodes(backendMap,
split.getHosts());
+ } else {
+ switch (nodeSelectionStrategy) {
+ case RANDOM: {
+ randomCandidates.reset();
+ candidateNodes =
selectNodes(Config.min_random_candidate_num, randomCandidates);
+ break;
+ }
+ case CONSISTENT_HASHING: {
+ candidateNodes = consistentHash.getNode(split,
Config.min_consistent_hash_candidate_num);
+ splitsToBeRedistributed = true;
+ break;
+ }
+ default: {
+ throw new RuntimeException();
+ }
+ }
+ }
+ if (candidateNodes.isEmpty()) {
+ LOG.debug("No nodes available to schedule {}. Available nodes
{}", split,
+ filteredNodes);
+ throw new
UserException(SystemInfoService.NO_SCAN_NODE_BACKEND_AVAILABLE_MSG);
+ }
+
+ Backend selectedBackend = chooseNodeForSplit(candidateNodes);
+ List<Backend> alternativeBackends = new
ArrayList<>(candidateNodes);
+ alternativeBackends.remove(selectedBackend);
+ split.setAlternativeHosts(
+ alternativeBackends.stream().map(each ->
each.getHost()).collect(Collectors.toList()));
+ assignment.put(selectedBackend, split);
+ assignedWeightPerBackend.put(selectedBackend,
+ assignedWeightPerBackend.get(selectedBackend) +
split.getSplitWeight().getRawValue());
+ }
+
+ if (splitsToBeRedistributed) {
+ equateDistribution(assignment);
+ }
+ return assignment;
}
- // Try to find a local BE, if not exists, use `getNextBe` instead
- public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations
scanRangeLocations) {
- List<Backend> candidateBackends =
Lists.newArrayListWithCapacity(hosts.size());
+ /**
+ * The method tries to make the distribution of splits more uniform. All
nodes are arranged into a maxHeap and
+ * a minHeap based on the number of splits that are assigned to them.
Splits are redistributed, one at a time,
+ * from a maxNode to a minNode until we have as uniform a distribution as
possible.
+ *
+ * @param assignment the node-splits multimap after the first and the
second stage
+ */
+ private void equateDistribution(ListMultimap<Backend, Split> assignment) {
+ if (assignment.isEmpty()) {
+ return;
+ }
+
+ List<Backend> allNodes = new ArrayList<>();
+ for (List<Backend> backendList : backendMap.values()) {
+ allNodes.addAll(backendList);
+ }
+ Collections.sort(allNodes, Comparator.comparing(Backend::getId));
+
+ if (allNodes.size() < 2) {
+ return;
+ }
+
+ IndexedPriorityQueue<Backend> maxNodes = new IndexedPriorityQueue<>();
+ for (Backend node : allNodes) {
+ maxNodes.addOrUpdate(node, assignedWeightPerBackend.get(node));
+ }
+
+ IndexedPriorityQueue<Backend> minNodes = new IndexedPriorityQueue<>();
+ for (Backend node : allNodes) {
+ minNodes.addOrUpdate(node, Long.MAX_VALUE -
assignedWeightPerBackend.get(node));
+ }
+
+ while (true) {
+ if (maxNodes.isEmpty()) {
+ return;
+ }
+
+ // fetch min and max node
+ Backend maxNode = maxNodes.poll();
+ Backend minNode = minNodes.poll();
+
+ // Allow some degree of non uniformity when assigning splits to
nodes. Usually data distribution
+ // among nodes in a cluster won't be fully uniform (e.g. because
hash function with non-uniform
+ // distribution is used like consistent hashing). In such case it
makes sense to assign splits to nodes
+ // with data because of potential savings in network throughput
and CPU time.
+ // The difference of 5 between node with maximum and minimum
splits is a tradeoff between ratio of
+ // misassigned splits and assignment uniformity. Using larger
numbers doesn't reduce the number of
+ // misassigned splits greatly (in absolute values).
+ if (assignedWeightPerBackend.get(maxNode) -
assignedWeightPerBackend.get(minNode)
+ <=
SplitWeight.rawValueForStandardSplitCount(Config.max_split_num_variance)) {
+ return;
+ }
+
+ // move split from max to min
+ Split redistributedSplit = redistributeSplit(assignment, maxNode,
minNode);
+
+ assignedWeightPerBackend.put(maxNode,
+ assignedWeightPerBackend.get(maxNode) -
redistributedSplit.getSplitWeight().getRawValue());
+ assignedWeightPerBackend.put(minNode, Math.addExact(
+ assignedWeightPerBackend.get(minNode),
redistributedSplit.getSplitWeight().getRawValue()));
+
+ // add max back into maxNodes only if it still has assignments
+ if (assignment.containsKey(maxNode)) {
+ maxNodes.addOrUpdate(maxNode,
assignedWeightPerBackend.get(maxNode));
+ }
+
+ // Add or update both the Priority Queues with the updated node
priorities
+ maxNodes.addOrUpdate(minNode,
assignedWeightPerBackend.get(minNode));
+ minNodes.addOrUpdate(minNode, Long.MAX_VALUE -
assignedWeightPerBackend.get(minNode));
+ minNodes.addOrUpdate(maxNode, Long.MAX_VALUE -
assignedWeightPerBackend.get(maxNode));
+ }
+ }
+
+ /**
+ * The method selects and removes a split from the fromNode and assigns it
to the toNode. There is an attempt to
+ * redistribute a Non-local split if possible. This case is possible when
there are multiple queries running
+ * simultaneously. If a Non-local split cannot be found in the maxNode,
next split is selected and reassigned.
+ */
+ @VisibleForTesting
+ public static Split redistributeSplit(Multimap<Backend, Split> assignment,
Backend fromNode,
+ Backend toNode) {
+ Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
+ Split splitToBeRedistributed = null;
+ while (splitIterator.hasNext()) {
+ Split split = splitIterator.next();
+ // Try to select non-local split for redistribution
+ if (split.getHosts() != null && !isSplitLocal(
+ split.getHosts(), fromNode.getHost())) {
+ splitToBeRedistributed = split;
+ break;
+ }
+ }
+ // Select split if maxNode has no non-local splits in the current
batch of assignment
+ if (splitToBeRedistributed == null) {
+ splitIterator = assignment.get(fromNode).iterator();
+ while (splitIterator.hasNext()) {
+ splitToBeRedistributed = splitIterator.next();
+ break;
+ // // if toNode has split replication, transfer this split
firstly
+ // if (splitToBeRedistributed.getHosts() != null &&
isSplitLocal(
+ // splitToBeRedistributed.getHosts(),
toNode.getHost())) {
+ // break;
+ // }
+ // // if toNode has split replication, transfer this split
firstly
+ // if (splitToBeRedistributed.getAlternativeHosts() != null &&
isSplitLocal(
+ // splitToBeRedistributed.getAlternativeHosts(),
toNode.getHost())) {
+ // break;
+ // }
+ }
+ }
+ splitIterator.remove();
+ assignment.put(toNode, splitToBeRedistributed);
+ return splitToBeRedistributed;
+ }
+
+ private static boolean isSplitLocal(String[] splitHosts, String host) {
+
+ for (String splitHost : splitHosts) {
+ if (splitHost.equals(host)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean isSplitLocal(List<String> splitHosts, String host) {
+
+ for (String splitHost : splitHosts) {
+ if (splitHost.equals(host)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static List<Backend> selectExactNodes(Map<String, List<Backend>>
backendMap, String[] hosts) {
+ Set<Backend> chosen = new LinkedHashSet<>();
+
for (String host : hosts) {
- List<Backend> backends = backendMap.get(host);
- if (CollectionUtils.isNotEmpty(backends)) {
-
candidateBackends.add(backends.get(random.nextInt(backends.size())));
+ if (backendMap.containsKey(host)) {
+ backendMap.get(host).stream()
+ .forEach(chosen::add);
}
}
+ return ImmutableList.copyOf(chosen);
+ }
+
+ public static List<Backend> selectNodes(int limit, Iterator<Backend>
candidates) {
+ Preconditions.checkArgument(limit > 0, "limit must be at least 1");
- return CollectionUtils.isEmpty(candidateBackends)
- ? getNextConsistentBe(scanRangeLocations)
- :
candidateBackends.get(random.nextInt(candidateBackends.size()));
+ List<Backend> selected = new ArrayList<>(limit);
+ while (selected.size() < limit && candidates.hasNext()) {
+ selected.add(candidates.next());
+ }
+
+ return selected;
+ }
+
+ private Backend chooseNodeForSplit(List<Backend> candidateNodes) {
+ Backend chosenNode = null;
+ long minWeight = Long.MAX_VALUE;
+
+ for (Backend node : candidateNodes) {
+ long queuedWeight = assignedWeightPerBackend.get(node);
+ if (queuedWeight <= minWeight) {
+ chosenNode = node;
+ minWeight = queuedWeight;
+ }
+ }
+
+ return chosenNode;
}
public int numBackends() {
@@ -200,15 +469,13 @@ public class FederationBackendPolicy {
}
}
- private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
+ private static class SplitHash implements Funnel<Split> {
@Override
- public void funnel(TScanRangeLocations scanRange, PrimitiveSink
primitiveSink) {
- Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
- for (TFileRangeDesc desc :
scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
-
primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
- primitiveSink.putLong(desc.start_offset);
- primitiveSink.putLong(desc.size);
- }
+ public void funnel(Split split, PrimitiveSink primitiveSink) {
+
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
+ primitiveSink.putLong(split.getStart());
+ primitiveSink.putLong(split.getLength());
}
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 71c731498a6..09a8a116f5a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -68,13 +68,14 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -314,76 +315,77 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.setProperties(locationProperties);
}
- boolean enableShortCircuitRead =
HdfsResource.enableShortCircuitRead(locationProperties);
List<String> pathPartitionKeys = getPathPartitionKeys();
- for (Split split : inputSplits) {
- FileSplit fileSplit = (FileSplit) split;
- TFileType locationType;
- if (fileSplit instanceof IcebergSplit
- && ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
- locationType = TFileType.FILE_BROKER;
- } else {
- locationType = getLocationType(fileSplit.getPath().toString());
- }
- TScanRangeLocations curLocations = newLocations();
- // If fileSplit has partition values, use the values collected
from hive partitions.
- // Otherwise, use the values in file path.
- boolean isACID = false;
- if (fileSplit instanceof HiveSplit) {
- HiveSplit hiveSplit = (HiveSplit) split;
- isACID = hiveSplit.isACID();
- }
- List<String> partitionValuesFromPath =
fileSplit.getPartitionValues() == null
- ?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false, isACID)
- : fileSplit.getPartitionValues();
-
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys,
- locationType);
- TFileCompressType fileCompressType =
getFileCompressType(fileSplit);
- rangeDesc.setCompressType(fileCompressType);
- if (isACID) {
- HiveSplit hiveSplit = (HiveSplit) split;
-
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
- TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
-
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
- AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
- TTransactionalHiveDesc transactionalHiveDesc = new
TTransactionalHiveDesc();
-
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
- List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new
ArrayList<>();
- for (DeleteDeltaInfo deleteDeltaInfo :
acidInfo.getDeleteDeltas()) {
- TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new
TTransactionalHiveDeleteDeltaDesc();
-
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
-
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
- deleteDeltaDescs.add(deleteDeltaDesc);
+ Multimap<Backend, Split> assignment =
backendPolicy.computeScanRangeAssignment(inputSplits);
+ // if (ConnectContext.get().getExecutor() != null) {
+ //
ConnectContext.get().getExecutor().getSummaryProfile().setComputeAssignmentTime();
+ //
ConnectContext.get().getExecutor().getSummaryProfile().setComputeAssignment(assignment);
+ // }
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> splits = assignment.get(backend);
+ for (Split split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ TFileType locationType;
+ if (fileSplit instanceof IcebergSplit
+ && ((IcebergSplit)
fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
+ locationType = TFileType.FILE_BROKER;
+ } else {
+ locationType =
getLocationType(fileSplit.getPath().toString());
+ }
+
+ TScanRangeLocations curLocations = newLocations();
+ // If fileSplit has partition values, use the values collected
from hive partitions.
+ // Otherwise, use the values in file path.
+ boolean isACID = false;
+ if (fileSplit instanceof HiveSplit) {
+ HiveSplit hiveSplit = (HiveSplit) fileSplit;
+ isACID = hiveSplit.isACID();
+ }
+ List<String> partitionValuesFromPath =
fileSplit.getPartitionValues() == null
+ ?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys,
+ false, isACID) : fileSplit.getPartitionValues();
+
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys,
+ locationType);
+ TFileCompressType fileCompressType =
getFileCompressType(fileSplit);
+ rangeDesc.setCompressType(fileCompressType);
+ if (isACID) {
+ HiveSplit hiveSplit = (HiveSplit) fileSplit;
+
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+ TTableFormatFileDesc tableFormatFileDesc = new
TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+ AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+ TTransactionalHiveDesc transactionalHiveDesc = new
TTransactionalHiveDesc();
+
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+ List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs =
new ArrayList<>();
+ for (DeleteDeltaInfo deleteDeltaInfo :
acidInfo.getDeleteDeltas()) {
+ TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc =
new TTransactionalHiveDeleteDeltaDesc();
+
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+ deleteDeltaDescs.add(deleteDeltaDesc);
+ }
+ transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
- transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
-
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
- rangeDesc.setTableFormatParams(tableFormatFileDesc);
- }
- setScanParams(rangeDesc, fileSplit);
-
-
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- TScanRangeLocation location = new TScanRangeLocation();
- Backend selectedBackend;
- if (enableShortCircuitRead) {
- // Try to find a local BE if enable hdfs short circuit read
- selectedBackend =
backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
- } else {
- // Use consistent hash to assign the same scan range into the
same backend among different queries
- selectedBackend =
backendPolicy.getNextConsistentBe(curLocations);
+ setScanParams(rangeDesc, fileSplit);
+
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+ TScanRangeLocation location = new TScanRangeLocation();
+ setLocationPropertiesIfNecessary(backend, locationType,
locationProperties);
+ location.setBackendId(backend.getId());
+ location.setServer(new TNetworkAddress(backend.getHost(),
backend.getBePort()));
+ curLocations.addToLocations(location);
+ LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
+ curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
+ fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
+ scanRangeLocations.add(curLocations);
+ this.totalFileSize += fileSplit.getLength();
}
- setLocationPropertiesIfNecessary(selectedBackend, locationType,
locationProperties);
- location.setBackendId(selectedBackend.getId());
- location.setServer(new TNetworkAddress(selectedBackend.getHost(),
selectedBackend.getBePort()));
- curLocations.addToLocations(location);
- LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
- curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
- fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
- scanRangeLocations.add(curLocations);
- this.totalFileSize += fileSplit.getLength();
}
+
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setCreateScanRangeFinishTime();
}
@@ -506,3 +508,4 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected abstract Map<String, String> getLocationProperties() throws
UserException;
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 4e9d0bae564..1221d3ad21b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -42,6 +42,8 @@ public class FileSplit implements Split {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
+ protected List<String> alternativeHosts;
+
public FileSplit(Path path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String>
partitionValues) {
this.path = path;
@@ -67,6 +69,11 @@ public class FileSplit implements Split {
return null;
}
+ @Override
+ public String getPathString() {
+ return path.toString();
+ }
+
public static class FileSplitCreator implements SplitCreator {
public static final FileSplitCreator DEFAULT = new FileSplitCreator();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
new file mode 100644
index 00000000000..451279f1a99
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IndexedPriorityQueue.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * A priority queue with constant time contains(E) and log time remove(E)
+ * Ties are broken by insertion order
+ */
+public final class IndexedPriorityQueue<E>
+ implements UpdateablePriorityQueue<E> {
+ private final Map<E, Entry<E>> index = new HashMap<>();
+ private final Set<Entry<E>> queue;
+ private long generation;
+
+ public IndexedPriorityQueue() {
+ this(PriorityOrdering.HIGH_TO_LOW);
+ }
+
+ public IndexedPriorityQueue(PriorityOrdering priorityOrdering) {
+ switch (priorityOrdering) {
+ case LOW_TO_HIGH:
+ queue = new TreeSet<>(
+ Comparator.comparingLong((Entry<E> entry) ->
entry.getPriority())
+ .thenComparingLong(Entry::getGeneration));
+ break;
+ case HIGH_TO_LOW:
+ queue = new TreeSet<>((entry1, entry2) -> {
+ int priorityComparison =
Long.compare(entry2.getPriority(), entry1.getPriority());
+ if (priorityComparison != 0) {
+ return priorityComparison;
+ }
+ return Long.compare(entry1.getGeneration(),
entry2.getGeneration());
+ });
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ @Override
+ public boolean addOrUpdate(E element, long priority) {
+ Entry<E> entry = index.get(element);
+ if (entry != null) {
+ if (entry.getPriority() == priority) {
+ return false;
+ }
+ queue.remove(entry);
+ Entry<E> newEntry = new Entry<>(element, priority,
entry.getGeneration());
+ queue.add(newEntry);
+ index.put(element, newEntry);
+ return false;
+ }
+ Entry<E> newEntry = new Entry<>(element, priority, generation);
+ generation++;
+ queue.add(newEntry);
+ index.put(element, newEntry);
+ return true;
+ }
+
+ @Override
+ public boolean contains(E element) {
+ return index.containsKey(element);
+ }
+
+ @Override
+ public boolean remove(E element) {
+ Entry<E> entry = index.remove(element);
+ if (entry != null) {
+ queue.remove(entry);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public E poll() {
+ Entry<E> entry = pollEntry();
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ @Override
+ public E peek() {
+ Entry<E> entry = peekEntry();
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return queue.isEmpty();
+ }
+
+ public Prioritized<E> getPrioritized(E element) {
+ Entry<E> entry = index.get(element);
+ if (entry == null) {
+ return null;
+ }
+
+ return new Prioritized<>(entry.getValue(), entry.getPriority());
+ }
+
+ public Prioritized<E> pollPrioritized() {
+ Entry<E> entry = pollEntry();
+ if (entry == null) {
+ return null;
+ }
+ return new Prioritized<>(entry.getValue(), entry.getPriority());
+ }
+
+ private Entry<E> pollEntry() {
+ Iterator<Entry<E>> iterator = queue.iterator();
+ if (!iterator.hasNext()) {
+ return null;
+ }
+ Entry<E> entry = iterator.next();
+ iterator.remove();
+ Preconditions.checkState(index.remove(entry.getValue()) != null,
"Failed to remove entry from index");
+ return entry;
+ }
+
+ public Prioritized<E> peekPrioritized() {
+ Entry<E> entry = peekEntry();
+ if (entry == null) {
+ return null;
+ }
+ return new Prioritized<>(entry.getValue(), entry.getPriority());
+ }
+
+ public Entry<E> peekEntry() {
+ Iterator<Entry<E>> iterator = queue.iterator();
+ if (!iterator.hasNext()) {
+ return null;
+ }
+ return iterator.next();
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return Iterators.transform(queue.iterator(), Entry::getValue);
+ }
+
+ public enum PriorityOrdering {
+ LOW_TO_HIGH,
+ HIGH_TO_LOW
+ }
+
+ private static final class Entry<E> {
+ private final E value;
+ private final long priority;
+ private final long generation;
+
+ private Entry(E value, long priority, long generation) {
+ this.value = Objects.requireNonNull(value, "value is null");
+ this.priority = priority;
+ this.generation = generation;
+ }
+
+ public E getValue() {
+ return value;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+
+ public long getGeneration() {
+ return generation;
+ }
+ }
+
+ public static class Prioritized<V> {
+ private final V value;
+ private final long priority;
+
+ public Prioritized(V value, long priority) {
+ this.value = Objects.requireNonNull(value, "value is null");
+ this.priority = priority;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public long getPriority() {
+ return priority;
+ }
+ }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
similarity index 83%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
index 31b1e1515a5..616dd5a9be0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/NodeSelectionStrategy.java
@@ -15,15 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
- String[] getHosts();
-
- Object getInfo();
+package org.apache.doris.planner.external;
+public enum NodeSelectionStrategy {
+ RANDOM,
+ CONSISTENT_HASHING
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
similarity index 68%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
index 31b1e1515a5..6e0597c821f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/Queue.java
@@ -14,16 +14,23 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/Queue.java
+// and modified by Doris
-package org.apache.doris.spi;
+package org.apache.doris.planner.external;
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
+interface Queue<E> {
+ boolean contains(E element);
- String[] getHosts();
+ boolean remove(E element);
- Object getInfo();
+ E poll();
+ E peek();
+
+ int size();
+
+ boolean isEmpty();
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
new file mode 100644
index 00000000000..feb8b6ae836
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ResettableRandomizedIterator.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/scheduler/ResettableRandomizedIterator.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ResettableRandomizedIterator<T>
+ implements Iterator<T> {
+ private final List<T> list;
+ private int position;
+
+ public ResettableRandomizedIterator(Collection<T> elements) {
+ this.list = new ArrayList<>(elements);
+ }
+
+ public void reset() {
+ position = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return position < list.size();
+ }
+
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ int position = ThreadLocalRandom.current().nextInt(this.position,
list.size());
+
+ T result = list.set(position, list.get(this.position));
+ list.set(this.position, result);
+ this.position++;
+
+ return result;
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
new file mode 100644
index 00000000000..d8724017b67
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitWeight.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/master/core/trino-spi/src/main/java/io/trino/spi/SplitWeight.java
+// and modified by Doris
+
+package org.apache.doris.planner.external;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.errorprone.annotations.DoNotCall;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.function.Function;
+
+public final class SplitWeight {
+ private static final long UNIT_VALUE = 100;
+ private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^
UNIT_SCALE) == UNIT_VALUE
+ private static final SplitWeight STANDARD_WEIGHT = new
SplitWeight(UNIT_VALUE);
+
+ private final long value;
+
+ private SplitWeight(long value) {
+ if (value <= 0) {
+ throw new IllegalArgumentException("value must be > 0, found: " +
value);
+ }
+ this.value = value;
+ }
+
+ /**
+ * Produces a {@link SplitWeight} from the raw internal value
representation. This method is intended
+ * primarily for JSON deserialization, and connectors should use not call
this factory method directly
+ * to construct {@link SplitWeight} instances. Instead, connectors should
use
+ * {@link SplitWeight#fromProportion(double)}
+ * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE}
changes in the future.
+ */
+ @JsonCreator
+ @DoNotCall // For JSON serialization only
+ public static SplitWeight fromRawValue(long value) {
+ return fromRawValueInternal(value);
+ }
+
+ /**
+ * Produces a {@link SplitWeight} that corresponds to the {@link
SplitWeight#standard()} weight
+ * proportionally, i.e., a parameter of <code>1.0</code> will be
equivalent to the standard weight
+ * and a value of <code>0.5</code> will be 1/2 of the standard split
weight. Valid arguments
+ * must be greater than zero and finite. Connectors should prefer
constructing split weights
+ * using this factory method rather than passing a raw integer value in
case the integer representation
+ * of a standard split needs to change in the future.
+ *
+ * @param weight the proportional weight relative to a standard split,
expressed as a double
+ * @return a {@link SplitWeight} with a raw value corresponding to the
requested proportion
+ */
+ public static SplitWeight fromProportion(double weight) {
+ if (weight <= 0 || !Double.isFinite(weight)) {
+ throw new IllegalArgumentException("Invalid weight: " + weight);
+ }
+ // Must round up to avoid small weights rounding to 0
+ return fromRawValueInternal((long) Math.ceil(weight * UNIT_VALUE));
+ }
+
+ private static SplitWeight fromRawValueInternal(long value) {
+ return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value);
+ }
+
+ public static SplitWeight standard() {
+ return STANDARD_WEIGHT;
+ }
+
+ public static long rawValueForStandardSplitCount(int splitCount) {
+ if (splitCount < 0) {
+ throw new IllegalArgumentException("splitCount must be >= 0,
found: " + splitCount);
+ }
+ return Math.multiplyExact(splitCount, UNIT_VALUE);
+ }
+
+ public static <T> long rawValueSum(Collection<T> collection, Function<T,
SplitWeight> getter) {
+ long sum = 0;
+ for (T item : collection) {
+ long value = getter.apply(item).getRawValue();
+ sum = Math.addExact(sum, value);
+ }
+ return sum;
+ }
+
+ /**
+ * @return The internal integer representation for this weight value
+ */
+ @JsonValue
+ public long getRawValue() {
+ return value;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(value);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof SplitWeight)) {
+ return false;
+ }
+ return this.value == ((SplitWeight) other).value;
+ }
+
+ @Override
+ public String toString() {
+ if (value == UNIT_VALUE) {
+ return "1";
+ }
+ return BigDecimal.valueOf(value,
-UNIT_SCALE).stripTrailingZeros().toPlainString();
+ }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
similarity index 69%
copy from fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
index 31b1e1515a5..97441cbd878 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/UpdateablePriorityQueue.java
@@ -14,16 +14,14 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+// This file is copied from
+//
https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
+// and modified by Doris
-package org.apache.doris.spi;
-
-/**
- * Split interface. e.g. Tablet for Olap Table.
- */
-public interface Split {
-
- String[] getHosts();
-
- Object getInfo();
+package org.apache.doris.planner.external;
+interface UpdateablePriorityQueue<E>
+ extends Queue<E>, Iterable<E> {
+ boolean addOrUpdate(E element, long priority);
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 31b1e1515a5..d42defe5c38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -17,6 +17,10 @@
package org.apache.doris.spi;
+import org.apache.doris.planner.external.SplitWeight;
+
+import java.util.List;
+
/**
* Split interface. e.g. Tablet for Olap Table.
*/
@@ -26,4 +30,23 @@ public interface Split {
Object getInfo();
+ default SplitWeight getSplitWeight() {
+ return SplitWeight.standard();
+ }
+
+ default boolean isRemotelyAccessible() {
+ return true;
+ }
+
+ String getPathString();
+
+ long getStart();
+
+ long getLength();
+
+ List<String> getAlternativeHosts();
+
+ void setAlternativeHosts(List<String> alternativeHosts);
+
}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index ef65d1b6165..a887b3e590a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -20,6 +20,10 @@ package org.apache.doris.planner;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.external.FederationBackendPolicy;
+import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.NodeSelectionStrategy;
+import org.apache.doris.planner.external.SplitWeight;
+import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExternalScanRange;
@@ -28,42 +32,49 @@ import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;
-import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
import mockit.Mock;
import mockit.MockUp;
-import mockit.Mocked;
-import org.junit.Before;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
public class FederationBackendPolicyTest {
- @Mocked
- private Env env;
+ private static <K, V> boolean areMultimapsEqualIgnoringOrder(
+ Multimap<K, V> multimap1, Multimap<K, V> multimap2) {
+ Collection<Map.Entry<K, V>> entries1 = multimap1.entries();
+ Collection<Map.Entry<K, V>> entries2 = multimap2.entries();
- @Before
- public void setUp() {
+ return entries1.containsAll(entries2) &&
entries2.containsAll(entries1);
+ }
+ @Test
+ public void testComputeScanRangeAssignmentRemote() throws UserException {
SystemInfoService service = new SystemInfoService();
- for (int i = 0; i < 190; i++) {
- Backend backend = new Backend(Long.valueOf(i), "192.168.1." + i,
9050);
- backend.setAlive(true);
- service.addBackend(backend);
- }
- for (int i = 0; i < 10; i++) {
- Backend backend = new Backend(Long.valueOf(190 + i), "192.168.1."
+ i, 9051);
- backend.setAlive(true);
- service.addBackend(backend);
- }
- for (int i = 0; i < 10; i++) {
- Backend backend = new Backend(Long.valueOf(200 + i), "192.168.2."
+ i, 9050);
- backend.setAlive(false);
- service.addBackend(backend);
- }
+ Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+ backend1.setAlive(true);
+ service.addBackend(backend1);
+ Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+ backend2.setAlive(true);
+ service.addBackend(backend2);
+ Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+ backend3.setAlive(true);
+ service.addBackend(backend3);
new MockUp<Env>() {
@Mock
@@ -72,56 +83,644 @@ public class FederationBackendPolicyTest {
}
};
- }
+ List<Split> splits = new ArrayList<>();
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 112140970, 112140970, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 120839661, 120839661, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 108897409, 108897409, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 95795997, 95795997, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 105664025, 105664025, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 103711014, 103711014, 0, null, Collections.emptyList()));
- @Test
- public void testGetNextBe() throws UserException {
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
- int backendNum = 200;
- int invokeTimes = 1000000;
+ int backendNum = 3;
Assertions.assertEquals(policy.numBackends(), backendNum);
- Stopwatch sw = Stopwatch.createStarted();
- for (int i = 0; i < invokeTimes; i++) {
-
Assertions.assertFalse(policy.getNextBe().getHost().contains("192.168.2."));
+
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> assignedSplits = assignment.get(backend);
+ long scanBytes = 0L;
+ for (Split split : assignedSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+ scanBytes += fileSplit.getLength();
+ }
+ System.out.printf("%s -> %d splits, %d bytes\n", backend,
assignedSplits.size(), scanBytes);
+ }
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // Assert.assertTrue(Math.abs(entry.getValue() - avg) < variance);
}
- sw.stop();
- System.out.println("Invoke getNextBe() " + invokeTimes
- + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "]
ms");
+
}
@Test
- public void testGetNextLocalBe() throws UserException {
- FederationBackendPolicy policy = new FederationBackendPolicy();
+ public void testComputeScanRangeAssignmentConsistentHash() throws
UserException {
+ SystemInfoService service = new SystemInfoService();
+
+ Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+ backend1.setAlive(true);
+ service.addBackend(backend1);
+ Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+ backend2.setAlive(true);
+ service.addBackend(backend2);
+ Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+ backend3.setAlive(true);
+ service.addBackend(backend3);
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ List<Split> splits = new ArrayList<>();
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 112140970, 112140970, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 120839661, 120839661, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 108897409, 108897409, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 95795997, 95795997, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 105664025, 105664025, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
+ FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
policy.init();
- int backendNum = 200;
- int invokeTimes = 1000000;
+ int backendNum = 3;
Assertions.assertEquals(policy.numBackends(), backendNum);
- List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1",
"192.168.1.2");
- TScanRangeLocations scanRangeLocations =
getScanRangeLocations("path1", 0, 100);
- Stopwatch sw = Stopwatch.createStarted();
- for (int i = 0; i < invokeTimes; i++) {
-
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts,
scanRangeLocations).getHost()));
+
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> assignedSplits = assignment.get(backend);
+ long scanBytes = 0L;
+ for (Split split : assignedSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+ scanBytes += fileSplit.getLength();
+ }
+ System.out.printf("%s -> %d splits, %d bytes\n", backend,
assignedSplits.size(), scanBytes);
}
- sw.stop();
- System.out.println("Invoke getNextLocalBe() " + invokeTimes
- + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "]
ms");
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // Assert.assertTrue(Math.abs(entry.getValue() - avg) < variance);
+ }
+
}
@Test
- public void testConsistentHash() throws UserException {
+ public void testComputeScanRangeAssignmentLocal() throws UserException {
+ SystemInfoService service = new SystemInfoService();
+
+ Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+ backend1.setAlive(true);
+ service.addBackend(backend1);
+ Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+ backend2.setAlive(true);
+ service.addBackend(backend2);
+ Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+ backend3.setAlive(true);
+ service.addBackend(backend3);
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ List<Split> splits = new ArrayList<>();
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 112140970, 112140970, 0, new String[] {"172.30.0.100"},
Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 120839661, 120839661, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 108897409, 108897409, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 95795997, 95795997, 0, new String[] {"172.30.0.106"},
Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 105664025, 105664025, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
FederationBackendPolicy policy = new FederationBackendPolicy();
policy.init();
- int backendNum = 200;
+ int backendNum = 3;
Assertions.assertEquals(policy.numBackends(), backendNum);
+ int totalSplitNum = 0;
+ List<Boolean> checkedLocalSplit = new ArrayList<>();
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> assignedSplits = assignment.get(backend);
+ for (Split split : assignedSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+ ++totalSplitNum;
+ if (fileSplit.getPath().equals(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
{
+ Assert.assertEquals("172.30.0.100", backend.getHost());
+ checkedLocalSplit.add(true);
+ } else if (fileSplit.getPath().equals(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc")))
{
+ Assert.assertEquals("172.30.0.106", backend.getHost());
+ checkedLocalSplit.add(true);
+ }
+ }
+ }
+ Assert.assertEquals(2, checkedLocalSplit.size());
+ Assert.assertEquals(9, totalSplitNum);
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ // Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ // for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ // System.out.printf("%s -> %d bytes\n", entry.getKey(),
entry.getValue());
+ // // Assert.assertTrue(Math.abs(entry.getValue() - avg) <
variance);
+ // }
+
+ }
+
+ @Test
+ public void testComputeScanRangeAssigmentRandom() throws UserException {
+ SystemInfoService service = new SystemInfoService();
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ Random random = new Random();
+ int backendNum = random.nextInt(100 - 1) + 1;
+
+ int minOctet3 = 0;
+ int maxOctet3 = 250;
+ int minOctet4 = 1;
+ int maxOctet4 = 250;
+ Set<Integer> backendIds = new HashSet<>();
+ Set<String> ipAddresses = new HashSet<>();
+ for (int i = 0; i < backendNum; i++) {
+ String ipAddress;
+ do {
+ int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) +
minOctet3;
+ int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) +
minOctet4;
+ ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+ } while (!ipAddresses.add(ipAddress));
+
+ int backendId;
+ do {
+ backendId = random.nextInt(90000) + 10000;
+ } while (!backendIds.add(backendId));
+
+ Backend backend = new Backend(backendId, ipAddress, 9050);
+ backend.setAlive(true);
+ service.addBackend(backend);
+ }
+
+ List<TScanRangeLocations> tScanRangeLocationsList = new ArrayList<>();
+ List<Split> remoteSplits = new ArrayList<>();
+ int splitCount = random.nextInt(1000 - 100) + 100;
+ for (int i = 0; i < splitCount; ++i) {
+ long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+ FileSplit split = new FileSplit(new Path(
+ "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
+ 0, splitLength, splitLength, 0, null,
Collections.emptyList());
+ remoteSplits.add(split);
+ tScanRangeLocationsList.add(
+ getScanRangeLocations(split.getPath().toString(),
split.getStart(), split.getLength()));
+ }
+
+ List<TScanRangeLocations> localScanRangeLocationsList = new
ArrayList<>();
+ List<Split> localSplits = new ArrayList<>();
+ int localSplitCount = random.nextInt(1000 - 100) + 100;
+ Set<String> totalLocalHosts = new HashSet<>();
+ for (int i = 0; i < localSplitCount; ++i) {
+ int localHostNum = random.nextInt(3 - 1) + 1;
+ Set<String> localHosts = new HashSet<>();
+ String localHost;
+ for (int j = 0; j < localHostNum; ++j) {
+ do {
+ localHost =
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+ } while (!localHosts.add(localHost));
+ totalLocalHosts.add(localHost);
+ }
+ long localSplitLength = random.nextInt(115343360 - 94371840) +
94371840;
+ FileSplit split = new FileSplit(new Path(
+ "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
+ 0, localSplitLength, localSplitLength, 0,
localHosts.toArray(new String[0]),
+ Collections.emptyList());
+ localSplits.add(split);
+ localScanRangeLocationsList.add(
+ getScanRangeLocations(split.getPath().toString(),
split.getStart(), split.getLength()));
+ }
+
+ ListMultimap<Backend, Split> result = null;
+ for (int i = 0; i < 3; ++i) {
+ FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ List<TScanRangeLocations> totalScanRangeLocationsList = new
ArrayList<>();
+ totalScanRangeLocationsList.addAll(tScanRangeLocationsList);
+ totalScanRangeLocationsList.addAll(localScanRangeLocationsList);
+ // Collections.shuffle(totalScanRangeLocationsList);
+ policy.init();
+ Assertions.assertEquals(policy.numBackends(), backendNum);
+ int totalSplitNum = 0;
+ int minSplitNumPerNode = Integer.MAX_VALUE;
- TScanRangeLocations scanRangeLocations =
getScanRangeLocations("path1", 0, 100);
- Assertions.assertEquals(39,
policy.getNextConsistentBe(scanRangeLocations).getId());
+ List<Split> totalSplits = new ArrayList<>();
+ totalSplits.addAll(remoteSplits);
+ totalSplits.addAll(localSplits);
+ // Collections.shuffle(totalSplits);
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(totalSplits);
+ if (i == 0) {
+ result = ArrayListMultimap.create(assignment);
+ } else {
+ Assertions.assertTrue(areMultimapsEqualIgnoringOrder(result,
assignment));
- scanRangeLocations = getScanRangeLocations("path2", 0, 100);
- Assertions.assertEquals(78,
policy.getNextConsistentBe(scanRangeLocations).getId());
+ }
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> splits = assignment.get(backend);
+ if (splits.size() < minSplitNumPerNode) {
+ minSplitNumPerNode = splits.size();
+ }
+
+ long scanBytes = 0L;
+ for (Split split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ scanBytes += fileSplit.getLength();
+ ++totalSplitNum;
+ if (fileSplit.getHosts() != null &&
fileSplit.getHosts().length > 0) {
+ for (String host : fileSplit.getHosts()) {
+ Assert.assertTrue(totalLocalHosts.contains(host));
+ }
+ }
+ }
+ System.out.printf("%s -> %d splits, %d bytes\n", backend,
splits.size(), scanBytes);
+ }
+ Assert.assertEquals(totalSplits.size(), totalSplitNum);
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ SplitWeight.rawValueForStandardSplitCount(1);
+ Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ long splitNumVariance = 1;
+ long splitWeightVariance = 5 *
SplitWeight.rawValueForStandardSplitCount(1);
+ Long minSplitWeight = stats.values()
+ .stream()
+ .min(Long::compare).orElse(0L);
+ for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ Assert.assertTrue(Math.abs(entry.getValue() - minSplitWeight)
<= splitWeightVariance);
+ }
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> splits = assignment.get(backend);
+ Assert.assertTrue(Math.abs(splits.size() - minSplitNumPerNode)
<= splitNumVariance);
+ }
+ }
}
+ @Test
+ public void testComputeScanRangeAssigmentNonAlive() throws UserException {
+ SystemInfoService service = new SystemInfoService();
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ Random random = new Random();
+ int backendNum = random.nextInt(100 - 1) + 1;
+
+ int minOctet3 = 0;
+ int maxOctet3 = 250;
+ int minOctet4 = 1;
+ int maxOctet4 = 250;
+ Set<Integer> backendIds = new HashSet<>();
+ Set<String> ipAddresses = new HashSet<>();
+ int aliveBackendNum = 0;
+ for (int i = 0; i < backendNum; i++) {
+ String ipAddress;
+ do {
+ int octet3 = random.nextInt((maxOctet3 - minOctet3) + 1) +
minOctet3;
+ int octet4 = random.nextInt((maxOctet4 - minOctet4) + 1) +
minOctet4;
+ ipAddress = 192 + "." + 168 + "." + octet3 + "." + octet4;
+ } while (!ipAddresses.add(ipAddress));
+
+ int backendId;
+ do {
+ backendId = random.nextInt(90000) + 10000;
+ } while (!backendIds.add(backendId));
+
+ Backend backend = new Backend(backendId, ipAddress, 9050);
+ if (i % 2 == 0) {
+ ++aliveBackendNum;
+ backend.setAlive(true);
+ } else {
+ backend.setAlive(false);
+ }
+ service.addBackend(backend);
+ }
+
+ List<TScanRangeLocations> tScanRangeLocationsList = new ArrayList<>();
+ List<Split> remoteSplits = new ArrayList<>();
+ int splitCount = random.nextInt(1000 - 100) + 100;
+ for (int i = 0; i < splitCount; ++i) {
+ long splitLength = random.nextInt(115343360 - 94371840) + 94371840;
+ FileSplit split = new FileSplit(new Path(
+ "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
+ 0, splitLength, splitLength, 0, null,
Collections.emptyList());
+ remoteSplits.add(split);
+ tScanRangeLocationsList.add(
+ getScanRangeLocations(split.getPath().toString(),
split.getStart(), split.getLength()));
+ }
+
+ List<TScanRangeLocations> localScanRangeLocationsList = new
ArrayList<>();
+ List<Split> localSplits = new ArrayList<>();
+ int localSplitCount = random.nextInt(1000 - 100) + 100;
+ Set<String> totalLocalHosts = new HashSet<>();
+ for (int i = 0; i < localSplitCount; ++i) {
+ int localHostNum = random.nextInt(3 - 1) + 1;
+ Set<String> localHosts = new HashSet<>();
+ String localHost;
+ for (int j = 0; j < localHostNum; ++j) {
+ do {
+ localHost =
service.getAllBackends().get(random.nextInt(service.getAllBackends().size())).getHost();
+ } while (!localHosts.add(localHost));
+ totalLocalHosts.add(localHost);
+ }
+ long localSplitLength = random.nextInt(115343360 - 94371840) +
94371840;
+ FileSplit split = new FileSplit(new Path(
+ "hdfs://HDFS00001/usr/hive/warehouse/test.db/test_table/"
+ UUID.randomUUID()),
+ 0, localSplitLength, localSplitLength, 0,
localHosts.toArray(new String[0]),
+ Collections.emptyList());
+ localSplits.add(split);
+ localScanRangeLocationsList.add(
+ getScanRangeLocations(split.getPath().toString(),
split.getStart(), split.getLength()));
+ }
+
+ Multimap<Backend, Split> result = null;
+ for (int i = 0; i < 3; ++i) {
+ FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ List<TScanRangeLocations> totalScanRangeLocationsList = new
ArrayList<>();
+ totalScanRangeLocationsList.addAll(tScanRangeLocationsList);
+ totalScanRangeLocationsList.addAll(localScanRangeLocationsList);
+ // Collections.shuffle(totalScanRangeLocationsList);
+ policy.init();
+ // policy.setScanRangeLocationsList(totalScanRangeLocationsList);
+ Assertions.assertEquals(policy.numBackends(), aliveBackendNum);
+ // for (TScanRangeLocations scanRangeLocations :
tScanRangeLocationsList) {
+ //
System.out.println(policy.getNextConsistentBe(scanRangeLocations).getId());
+ // }
+ int totalSplitNum = 0;
+ List<Split> totalSplits = new ArrayList<>();
+ totalSplits.addAll(remoteSplits);
+ totalSplits.addAll(localSplits);
+ // Collections.shuffle(totalSplits);
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(totalSplits);
+ if (i == 0) {
+ result = ArrayListMultimap.create(assignment);
+ } else {
+ Assertions.assertEquals(result, assignment);
+ }
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> splits = assignment.get(backend);
+ long scanBytes = 0L;
+ for (Split split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ scanBytes += fileSplit.getLength();
+ ++totalSplitNum;
+ if (fileSplit.getHosts() != null &&
fileSplit.getHosts().length > 0) {
+ for (String host : fileSplit.getHosts()) {
+ Assert.assertTrue(totalLocalHosts.contains(host));
+ }
+ }
+ }
+ System.out.printf("%s -> %d splits, %d bytes\n", backend,
splits.size(), scanBytes);
+ }
+ Assert.assertEquals(totalSplits.size(), totalSplitNum);
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // Assert.assertTrue(Math.abs(entry.getValue() - avg) <
variance);
+ }
+ }
+ }
+
+ @Test
+ public void testComputeScanRangeAssignmentNodeChanged() throws
UserException {
+ SystemInfoService service = new SystemInfoService();
+
+ Backend backend1 = new Backend(10002L, "172.30.0.100", 9050);
+ backend1.setAlive(true);
+ service.addBackend(backend1);
+ Backend backend2 = new Backend(10003L, "172.30.0.106", 9050);
+ backend2.setAlive(true);
+ service.addBackend(backend2);
+ Backend backend3 = new Backend(10004L, "172.30.0.118", 9050);
+ backend3.setAlive(true);
+ service.addBackend(backend3);
+
+ new MockUp<Env>() {
+ @Mock
+ public SystemInfoService getCurrentSystemInfo() {
+ return service;
+ }
+ };
+
+ List<Split> splits = new ArrayList<>();
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00000-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 112140970, 112140970, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00001-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 120839661, 120839661, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00002-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 108897409, 108897409, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00003-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 95795997, 95795997, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00004-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 104600402, 104600402, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00005-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 105664025, 105664025, 0, null, Collections.emptyList()));
+ splits.add(new FileSplit(new Path(
+
"hdfs://HDFS8000871/usr/hive/warehouse/clickbench.db/hits_orc/part-00006-3e24f7d5-f658-4a80-a168-7b215c5a35bf-c000.snappy.orc"),
+ 0, 103711014, 103711014, 0, null, Collections.emptyList()));
+
+ Map<Split, Backend> originSplitAssignedBackends = new HashMap<>();
+ {
+ FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ policy.init();
+ int backendNum = 3;
+ Assertions.assertEquals(policy.numBackends(), backendNum);
+ Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+
+ for (Backend backend : assignment.keySet()) {
+ Collection<Split> assignedSplits = assignment.get(backend);
+ long scanBytes = 0L;
+ for (Split split : assignedSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+ scanBytes += fileSplit.getLength();
+ originSplitAssignedBackends.put(split, backend);
+ }
+ System.out.printf("%s -> %d splits, %d bytes\n", backend,
assignedSplits.size(), scanBytes);
+ }
+ // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // int variance = 5 * scanRangeSize;
+ Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // Assert.assertTrue(Math.abs(entry.getValue() - avg) <
variance);
+ }
+ }
+
+ // remove a node
+ // {
+ // service.dropBackend(backend3.getId());
+ // int changed = 0;
+ //
+ // FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ // policy.init();
+ // int backendNum = 2;
+ // Assertions.assertEquals(policy.numBackends(), backendNum);
+ // Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+ //
+ // for (Backend backend : assignment.keySet()) {
+ // Collection<Split> assignedSplits = assignment.get(backend);
+ // long ScanBytes = 0L;
+ // for (Split split : assignedSplits) {
+ // FileSplit fileSplit = (FileSplit) split;
+ // ScanBytes += fileSplit.getLength();
+ // Backend origin = originSplitAssignedBackends.get(split);
+ // if (!backend.equals(origin)) {
+ // changed += 1;
+ // }
+ // }
+ // System.out.printf("%s -> %d splits, %d bytes\n", backend,
assignedSplits.size(), ScanBytes);
+ // }
+ //
+ // // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // // int variance = 5 * scanRangeSize;
+ // Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ // for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ // System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // // Assert.assertTrue(Math.abs(entry.getValue() - avg) <
variance);
+ // }
+ //
+ // float moveRatio = changed * 1.0f / assignment.values().size();
+ // System.out.printf("Remove a node: move ratio = %.2f\n",
moveRatio);
+ // }
+ //
+ // // add a node
+ // {
+ // Backend backend4 = new Backend(10004L, "172.30.0.128", 9050);
+ // backend4.setAlive(true);
+ // service.addBackend(backend4);
+ // int changed = 0;
+ //
+ // FederationBackendPolicy policy = new
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+ // policy.init();
+ // int backendNum = 3;
+ // Assertions.assertEquals(policy.numBackends(), backendNum);
+ // Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(splits);
+ //
+ // for (Backend backend : assignment.keySet()) {
+ // Collection<Split> assignedSplits = assignment.get(backend);
+ // long ScanBytes = 0L;
+ // for (Split split : assignedSplits) {
+ // FileSplit fileSplit = (FileSplit) split;
+ // ScanBytes += fileSplit.getLength();
+ // Backend origin = originSplitAssignedBackends.get(split);
+ // if (!backend.equals(origin)) {
+ // changed += 1;
+ // }
+ // }
+ // System.out.printf("%s -> %d splits, %d bytes\n", backend,
assignedSplits.size(), ScanBytes);
+ // }
+ // // int avg = (scanRangeNumber * scanRangeSize) / hostNumber;
+ // // int variance = 5 * scanRangeSize;
+ // Map<Backend, Long> stats = policy.getAssignedWeightPerBackend();
+ // for (Map.Entry<Backend, Long> entry : stats.entrySet()) {
+ // System.out.printf("weight: %s -> %d\n", entry.getKey(),
entry.getValue());
+ // // Assert.assertTrue(Math.abs(entry.getValue() - avg) <
variance);
+ // }
+ //
+ // float moveRatio = changed * 1.0f / assignment.values().size();
+ // System.out.printf("Add a node, move ratio = %.2f\n", moveRatio);
+ // }
+
+ }
+
+
private TScanRangeLocations getScanRangeLocations(String path, long
startOffset, long size) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
@@ -145,3 +744,4 @@ public class FederationBackendPolicyTest {
return rangeDesc;
}
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]