This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 17a3873 Improve performance of DistinctCountThetaSketch by
eliminating empty sketches and unions. (#5798)
17a3873 is described below
commit 17a38733c600d8c75b440e6ea3280903c7d9e9ec
Author: Mayank Shrivastava <[email protected]>
AuthorDate: Tue Aug 4 14:04:52 2020 -0700
Improve performance of DistinctCountThetaSketch by eliminating empty
sketches and unions. (#5798)
* In a case with large number of predicates in the
post-aggregation-expression (with OR's), we tend
to end up with a lot of empty sketches (and unions) when not every row
matches each predicate.
This causes an overhead of creating sketches and union'ing them, leading
to potentially huge performance hit.
* In this PR, we improve this behavior by:
- Filtering out empty unions/sketchs when extracting aggregation results.
- Careful merging of results in `merge()` with mininmal unions (only when
necessary).
* We could also perform lazy creation of unions (to ensure that they are
not empty), but that would mean
a hash-map lookup per row. This will penalize the general case when
there's less number of emtpy unions.
So this approach was not taken.
* We saw an overall improvement in latency of about 50%, for cases with:
- Large number of predicates, and
- Large number of segments, and
- Small number of matches per predicate per segment.
---
...istinctCountThetaSketchAggregationFunction.java | 46 +++++++++++++++-------
1 file changed, 32 insertions(+), 14 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 7f02368..99cef39 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -392,7 +392,12 @@ public class DistinctCountThetaSketchAggregationFunction
implements AggregationF
Map<String, Sketch> result = new HashMap<>();
for (PredicateInfo predicateInfo : _predicateInfoMap.values()) {
- result.put(predicateInfo.getStringPredicate(),
unionMap.get(predicateInfo.getPredicate()).getResult());
+ Sketch sketch = unionMap.get(predicateInfo.getPredicate()).getResult();
+
+ // Skip empty sketches, as they lead to unnecessary unions (and cost
performance)
+ if (!sketch.isEmpty()) {
+ result.put(predicateInfo.getStringPredicate(), sketch);
+ }
}
return result;
}
@@ -406,7 +411,12 @@ public class DistinctCountThetaSketchAggregationFunction
implements AggregationF
Map<String, Sketch> result = new HashMap<>();
for (PredicateInfo predicateInfo : _predicateInfoMap.values()) {
- result.put(predicateInfo.getStringPredicate(),
unionMap.get(predicateInfo.getPredicate()).getResult());
+ Sketch sketch = unionMap.get(predicateInfo.getPredicate()).getResult();
+
+ // Skip empty sketches, as they lead to unnecessary unions (and cost
performance)
+ if (!sketch.isEmpty()) {
+ result.put(predicateInfo.getStringPredicate(), sketch);
+ }
}
return result;
}
@@ -419,25 +429,33 @@ public class DistinctCountThetaSketchAggregationFunction
implements AggregationF
return intermediateResult1;
}
- // NOTE: Here we parse the map keys to Predicate to handle the
non-standard predicate string returned from server
- // side for backward-compatibility.
- // TODO: Remove the extra parsing after releasing 0.5.0
- Map<Predicate, Union> unionMap = getDefaultUnionMap();
+ // Add sketches from intermediateResult1, merged with overlapping ones
from intermediateResult2
+ Map<String, Sketch> mergedResult = new HashMap<>();
for (Map.Entry<String, Sketch> entry : intermediateResult1.entrySet()) {
- Predicate predicate = getPredicate(entry.getKey());
- unionMap.get(predicate).update(entry.getValue());
+ String predicate = entry.getKey();
+ Sketch sketch = intermediateResult2.get(predicate);
+
+ // Merge the overlapping ones
+ if (sketch != null) {
+ Union union = getSetOperationBuilder().buildUnion();
+ union.update(entry.getValue());
+ union.update(sketch);
+ mergedResult.put(predicate, union.getResult());
+ } else { // Collect the non-overlapping ones
+ mergedResult.put(predicate, entry.getValue());
+ }
}
+
+ // Add sketches that are only in intermediateResult2
for (Map.Entry<String, Sketch> entry : intermediateResult2.entrySet()) {
- Predicate predicate = getPredicate(entry.getKey());
- unionMap.get(predicate).update(entry.getValue());
- }
- Map<String, Sketch> mergedResult = new HashMap<>();
- for (Map.Entry<Predicate, Union> entry : unionMap.entrySet()) {
- mergedResult.put(entry.getKey().toString(),
entry.getValue().getResult());
+ // If key already present, it was already merged in the previous
iteration.
+ mergedResult.putIfAbsent(entry.getKey(), entry.getValue());
}
+
return mergedResult;
}
+
@Override
public boolean isIntermediateResultComparable() {
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]