This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6da33cb60 [core] Improve performance of dinstinct collect agg. (#3772)
6da33cb60 is described below
commit 6da33cb6072ce90ad311450e0143aa9bf069d624
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jul 18 11:32:25 2024 +0800
[core] Improve performance of dinstinct collect agg. (#3772)
---
.../paimon/mergetree/compact/PartialUpdateMergeFunction.java | 2 +-
.../paimon/mergetree/compact/aggregate/FieldAggregator.java | 4 ++++
.../paimon/mergetree/compact/aggregate/FieldCollectAgg.java | 12 +++++++++++-
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3013d6ad5..f1269119f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -179,7 +179,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
row.setField(
i, aggregator == null ? field :
aggregator.agg(accumulator, field));
} else if (aggregator != null) {
- row.setField(i, aggregator.agg(field, accumulator));
+ row.setField(i, aggregator.aggReversed(accumulator,
field));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index f19506f4b..77eeecd8d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -172,6 +172,10 @@ public abstract class FieldAggregator implements
Serializable {
public abstract Object agg(Object accumulator, Object inputField);
+ public Object aggReversed(Object accumulator, Object inputField) {
+ return agg(inputField, accumulator);
+ }
+
/** reset the aggregator to a clean start state. */
public void reset() {}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index b15328704..590c3d0b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -87,6 +87,14 @@ public class FieldCollectAgg extends FieldAggregator {
return NAME;
}
+ @Override
+ public Object aggReversed(Object accumulator, Object inputField) {
+ // we don't need to actually do the reverse here for this agg
+ // because accumulator has been distinct, just let accumulator be
accumulator will speed up
+ // dinstinct process
+ return agg(accumulator, inputField);
+ }
+
@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null && inputField == null) {
@@ -99,7 +107,9 @@ public class FieldCollectAgg extends FieldAggregator {
if (equaliser != null) {
List<Object> collection = new ArrayList<>();
- collectWithEqualiser(collection, accumulator);
+ // do not need to distinct accumulator, because the accumulator is
always distinct, no
+ // need to distinct it every time
+ collect(collection, accumulator);
collectWithEqualiser(collection, inputField);
return new GenericArray(collection.toArray());
} else {