lincoln-lil commented on code in PR #25291:
URL: https://github.com/apache/flink/pull/25291#discussion_r1753015269
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/PercentileAggFunction.java:
##########
@@ -75,35 +74,45 @@ public boolean equals(Object o) {
}
PercentileAccumulator that = (PercentileAccumulator) o;
return Arrays.equals(percentages, that.percentages)
- && totalCount == that.totalCount
&& Objects.equals(valueCount, that.valueCount);
}
@Override
public int hashCode() {
- return Objects.hash(Arrays.hashCode(percentages), totalCount,
valueCount.hashCode());
+ return Objects.hash(Arrays.hashCode(percentages),
valueCount.hashCode());
}
public Double[] getValue() {
- // calculate the position for each percentage and sort it, so that
all percentiles can
- // be obtained with a single traversal
- List<Pair<Double, Integer>> sortedPercentages = new ArrayList<>();
- for (int index = 0; index < percentages.length; index++) {
- sortedPercentages.add(new Pair<>(percentages[index] *
(totalCount - 1) + 1, index));
- }
- sortedPercentages.sort(Comparator.comparing(Pair::getKey));
+ // calculate totalCount here to avoid the inconsistencies between
valueCount and
+ // totalCount, which may arise if both are stored in the
accumulator and have different
Review Comment:
nit: It's not so much the different state ttl settings, it's the fact that
operating serially on two different states (ValueState acc.totalCount vs
MapState acc.valueCount) can't achieve consistent expiration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]