This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4b3339a4aed [opt](scan) read scan ranges in the order of partitions
(#33515) (#33657)
4b3339a4aed is described below
commit 4b3339a4aed20b6daa5919e532edc7edee6e832f
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Apr 15 16:20:12 2024 +0800
[opt](scan) read scan ranges in the order of partitions (#33515) (#33657)
backport: #33515
---
be/src/pipeline/exec/file_scan_operator.cpp | 36 +++++++++++++++-------
be/src/vec/exec/scan/new_file_scan_node.cpp | 36 +++++++++++++++-------
.../doris/datasource/FederationBackendPolicy.java | 27 ++++++----------
.../doris/planner/FederationBackendPolicyTest.java | 19 ++++++++++--
4 files changed, 77 insertions(+), 41 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index ac193147dfb..f81781481df 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,20 +73,34 @@ void FileScanLocalState::set_scan_ranges(RuntimeState*
state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of
threads in thread pool.
- _scan_ranges.clear();
- auto range_iter = scan_ranges.begin();
- for (int i = 0; i < max_scanners && range_iter != scan_ranges.end();
++i, ++range_iter) {
- _scan_ranges.push_back(*range_iter);
+ // scan_ranges is sorted by path(as well as partition path) in FE, so
merge scan ranges in order.
+ // In the insert statement, reading data in partition order can reduce
the memory usage of BE
+ // and prevent the generation of smaller tables.
+ _scan_ranges.resize(max_scanners);
+ int num_ranges = scan_ranges.size() / max_scanners;
+ int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+ int scan_index = 0;
+ int range_index = 0;
+ for (int i = 0; i < num_add_one; ++i) {
+ _scan_ranges[scan_index] = scan_ranges[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges; j++) {
+ auto& merged_ranges =
+
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
+ }
}
- for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
- if (i == max_scanners) {
- i = 0;
+ for (int i = num_add_one; i < max_scanners; ++i) {
+ _scan_ranges[scan_index] = scan_ranges[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges - 1; j++) {
+ auto& merged_ranges =
+
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- auto& ranges =
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
- auto& merged_ranges =
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
- ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- _scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 2ce80f4463a..eed7cfaaec6 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -71,20 +71,34 @@ void NewFileScanNode::set_scan_ranges(RuntimeState* state,
_scan_ranges = scan_ranges;
} else {
// There is no need for the number of scanners to exceed the number of
threads in thread pool.
- _scan_ranges.clear();
- auto range_iter = scan_ranges.begin();
- for (int i = 0; i < max_scanners && range_iter != scan_ranges.end();
++i, ++range_iter) {
- _scan_ranges.push_back(*range_iter);
+ // scan_ranges is sorted by path(as well as partition path) in FE, so
merge scan ranges in order.
+ // In the insert statement, reading data in partition order can reduce
the memory usage of BE
+ // and prevent the generation of smaller tables.
+ _scan_ranges.resize(max_scanners);
+ int num_ranges = scan_ranges.size() / max_scanners;
+ int num_add_one = scan_ranges.size() - num_ranges * max_scanners;
+ int scan_index = 0;
+ int range_index = 0;
+ for (int i = 0; i < num_add_one; ++i) {
+ _scan_ranges[scan_index] = scan_ranges[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges; j++) {
+ auto& merged_ranges =
+
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
+ }
}
- for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
- if (i == max_scanners) {
- i = 0;
+ for (int i = num_add_one; i < max_scanners; ++i) {
+ _scan_ranges[scan_index] = scan_ranges[range_index++];
+ auto& ranges =
+
_scan_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ for (int j = 0; j < num_ranges - 1; j++) {
+ auto& merged_ranges =
+
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+ ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- auto& ranges =
_scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
- auto& merged_ranges =
range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
- ranges.insert(ranges.end(), merged_ranges.begin(),
merged_ranges.end());
}
- _scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
if (scan_ranges.size() > 0 &&
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 73a49bb24a8..13756c978f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -211,24 +211,18 @@ public class FederationBackendPolicy {
this.enableSplitsRedistribution = enableSplitsRedistribution;
}
+ /**
+ * Assign splits to each backend. Ensure that each backend receives a
similar amount of data.
+ * In order to make sure backends utilize the os page cache as much as
possible, and all backends read splits
+ * in the order of partitions(reading data in partition order can reduce
the memory usage of backends),
+ * splits should be sorted by path.
+ * Fortunately, the process of obtaining splits ensures that the splits
have been sorted according to the path.
+ * If the splits are unordered, it is strongly recommended to sort them
before calling this function.
+ */
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split>
splits) throws UserException {
- // Sorting splits is to ensure that the same query utilizes the os
page cache as much as possible.
- splits.sort((split1, split2) -> {
- int pathComparison =
split1.getPathString().compareTo(split2.getPathString());
- if (pathComparison != 0) {
- return pathComparison;
- }
-
- int startComparison = Long.compare(split1.getStart(),
split2.getStart());
- if (startComparison != 0) {
- return startComparison;
- }
- return Long.compare(split1.getLength(), split2.getLength());
- });
-
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
- List<Split> remainingSplits = null;
+ List<Split> remainingSplits;
List<Backend> backends = new ArrayList<>();
for (List<Backend> backendList : backendMap.values()) {
@@ -242,8 +236,7 @@ public class FederationBackendPolicy {
// locality information
if (Config.split_assigner_optimized_local_scheduling) {
remainingSplits = new ArrayList<>(splits.size());
- for (int i = 0; i < splits.size(); ++i) {
- Split split = splits.get(i);
+ for (Split split : splits) {
if (split.isRemotelyAccessible() && (split.getHosts() != null
&& split.getHosts().length > 0)) {
List<Backend> candidateNodes =
selectExactNodes(backendMap, split.getHosts());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index c0307cbd6d1..82f46862674 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -288,6 +288,21 @@ public class FederationBackendPolicyTest {
}
+ public static void sortSplits(List<Split> splits) {
+ splits.sort((split1, split2) -> {
+ int pathComparison =
split1.getPathString().compareTo(split2.getPathString());
+ if (pathComparison != 0) {
+ return pathComparison;
+ }
+
+ int startComparison = Long.compare(split1.getStart(),
split2.getStart());
+ if (startComparison != 0) {
+ return startComparison;
+ }
+ return Long.compare(split1.getLength(), split2.getLength());
+ });
+ }
+
@Test
public void testGenerateRandomly() throws UserException {
SystemInfoService service = new SystemInfoService();
@@ -367,7 +382,7 @@ public class FederationBackendPolicyTest {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
- Collections.shuffle(totalSplits);
+ sortSplits(totalSplits);
Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);
@@ -489,7 +504,7 @@ public class FederationBackendPolicyTest {
List<Split> totalSplits = new ArrayList<>();
totalSplits.addAll(remoteSplits);
totalSplits.addAll(localSplits);
- Collections.shuffle(totalSplits);
+ sortSplits(totalSplits);
Multimap<Backend, Split> assignment =
policy.computeScanRangeAssignment(totalSplits);
if (i == 0) {
result = ArrayListMultimap.create(assignment);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]