This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f0cde35ea6 [performance improvement] Spark Load, SparkDpp
processRDDAggregate performance improvement (#12186)
f0cde35ea6 is described below
commit f0cde35ea66220d6d618dc06caf566b8e7a6ab89
Author: HouRong <[email protected]>
AuthorDate: Wed Aug 31 09:14:13 2022 +0800
[performance improvement] Spark Load, SparkDpp processRDDAggregate
performance improvement (#12186)
Co-authored-by: hourong <[email protected]>
---
.../src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java | 8 ++------
1 file changed, 2 insertions(+), 6 deletions(-)
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 5d951ad70b..53ae81686f 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -138,12 +138,8 @@ public final class SparkDpp implements
java.io.Serializable {
RollupTreeNode curNode, SparkRDDAggregator[] sparkRDDAggregators)
throws SparkDppException {
final boolean isDuplicateTable =
!StringUtils.equalsIgnoreCase(curNode.indexMeta.indexType, "AGGREGATE")
&& !StringUtils.equalsIgnoreCase(curNode.indexMeta.indexType,
"UNIQUE");
-
// Aggregate/UNIQUE table
if (!isDuplicateTable) {
- // TODO(wb) set the reduce concurrency by statistic instead of
hard code 200
- int aggregateConcurrency = 200;
-
int idx = 0;
for (int i = 0; i < curNode.indexMeta.columns.size(); i++) {
if (!curNode.indexMeta.columns.get(i).isKey) {
@@ -155,14 +151,14 @@ public final class SparkDpp implements
java.io.Serializable {
if (curNode.indexMeta.isBaseIndex) {
JavaPairRDD<List<Object>, Object[]> result =
currentPairRDD.mapToPair(
new
EncodeBaseAggregateTableFunction(sparkRDDAggregators))
- .reduceByKey(new
AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+ .reduceByKey(new
AggregateReduceFunction(sparkRDDAggregators));
return result;
} else {
JavaPairRDD<List<Object>, Object[]> result = currentPairRDD
.mapToPair(new EncodeRollupAggregateTableFunction(
getColumnIndexInParentRollup(curNode.keyColumnNames, curNode.valueColumnNames,
curNode.parent.keyColumnNames,
curNode.parent.valueColumnNames)))
- .reduceByKey(new
AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+ .reduceByKey(new
AggregateReduceFunction(sparkRDDAggregators));
return result;
}
// Duplicate Table
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]