lincoln-lil commented on code in PR #25291:
URL: https://github.com/apache/flink/pull/25291#discussion_r1751114742


##########
docs/data/sql_functions_zh.yml:
##########
@@ -313,6 +313,26 @@ arithmetic:
       如果 integer2 为 0,则结果没有小数点或小数部分。integer2 可以为负数,使值的小数点左边的 integer2 位变为零。
       此函数也可以只传入一个参数 numeric1 且不设置参数 integer2 来使用。如果未设置 integer2 则 integer2 默认为 
0。
       例如 42.324.truncate(2) 为 42.32,42.324.truncate() 为 42.0。
+  - sql: PERCENTILE(expr, percentage[, frequency])
+    table: expr.percentile(percentage[, frequency])
+    description: |
+      返回 expr 的 percentage 百分位值。
+
+      percentage 必须是一个在 `[0.0, 1.0]` 之间的字面数值,或者是一个该范围的数组。
+      如果传递了一个可变表达式给这个函数,结果将根据其中的任意一个值进行计算。
+      frequency 描述了 expr 的统计次数,默认值为 1。
+
+      如果没有 expr 恰好位于指定的百分位上,则结果通过对两个最接近的 expr 进行线性插值来计算。
+      如果 expr 或 frequency 为 `NULL`,或者 frequency 不是正数,则将忽略该输入行。
+
+      NOTE: 建议在窗口场景中使用此函数,因为它通常能提供更好的性能。

Review Comment:
   'NOTE' -> '注意' or '请注意'



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/PercentileAggFunction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.math3.util.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in PERCENTILE aggregate function. */
+@Internal
+public abstract class PercentileAggFunction<T>
+        extends BuiltInAggregateFunction<T, 
PercentileAggFunction.PercentileAccumulator> {
+
+    protected final transient DataType valueType;
+    protected final transient DataType frequencyType;
+
+    public PercentileAggFunction(LogicalType inputType, LogicalType 
frequencyType) {
+        this.valueType = toInternalDataType(inputType);
+        this.frequencyType = frequencyType == null ? null : 
toInternalDataType(frequencyType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for PERCENTILE. */
+    public static class PercentileAccumulator {
+
+        public double[] percentages;
+        public long totalCount;
+        public MapView<Double, Long> valueCount;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PercentileAccumulator that = (PercentileAccumulator) o;
+            return Arrays.equals(percentages, that.percentages)
+                    && totalCount == that.totalCount
+                    && Objects.equals(valueCount, that.valueCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(Arrays.hashCode(percentages), totalCount, 
valueCount.hashCode());
+        }
+
+        public Double[] getValue() {
+            // calculate the position for each percentage and sort it, so that 
all percentiles can
+            // be obtained with a single traversal
+            List<Pair<Double, Integer>> sortedPercentages = new ArrayList<>();
+            for (int index = 0; index < percentages.length; index++) {
+                sortedPercentages.add(new Pair<>(percentages[index] * 
(totalCount - 1) + 1, index));
+            }
+            sortedPercentages.sort(Comparator.comparing(Pair::getKey));
+
+            // get a sorted list from hashmap for single traversal
+            List<Map.Entry<Double, Long>> sortedList = new ArrayList<>();
+            try {
+                for (Map.Entry<Double, Long> entry : valueCount.entries()) {
+                    sortedList.add(entry);
+                }
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(e);
+            }
+            sortedList.sort(Map.Entry.comparingByKey());
+
+            Double[] percentiles = new Double[percentages.length];
+
+            long preCnt = sortedList.get(0).getValue();
+            for (int i = 0, j = 0; i < sortedPercentages.size(); i++) {
+                Pair<Double, Integer> entry = sortedPercentages.get(i);
+                double position = entry.getKey();
+                long lower = (long) Math.floor(position);
+                long higher = (long) Math.ceil(position);
+
+                // lower <= (totalCount - 1) * percentage + 1 <= totalCount
+                // hence, j will never overflow
+                while (preCnt < lower) {
+                    j++;
+                    preCnt += sortedList.get(j).getValue();
+                }
+
+                percentiles[entry.getValue()] =
+                        preCnt >= higher
+                                // 1. position is between two same values
+                                // 2. position corresponds to an exact index 
in the list
+                                ? sortedList.get(j).getKey()
+                                // linear interpolation to get the exact 
percentile
+                                : (higher - position) * 
sortedList.get(j).getKey()
+                                        + (position - lower) * 
sortedList.get(j + 1).getKey();
+            }
+
+            return percentiles;
+        }
+
+        public void setPercentages(Double percentage) {
+            // save and verify percentage values only the first time to avoid 
redundant assignments
+            // and verifications
+            if (percentage < 0.0 || percentage > 1.0) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Percentage of PERCENTILE should be between 
[0.0, 1.0], but was '%s'.",
+                                percentage));
+            }
+            percentages = new double[] {percentage};
+        }
+
+        public void setPercentages(Double[] percentage) {
+            // save and verify percentage values only the first time to avoid 
redundant assignments
+            // and verifications
+            percentages = new double[percentage.length];
+            for (int i = 0; i < percentages.length; i++) {
+                if (percentage[i] < 0.0 || percentage[i] > 1.0) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Percentage of PERCENTILE should be 
between [0.0, 1.0], but was '%s'.",
+                                    percentage[i]));
+                }
+                percentages[i] = percentage[i];
+            }
+        }
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                PercentileAccumulator.class,
+                DataTypes.FIELD(
+                        "percentages",
+                        
DataTypes.ARRAY(DataTypes.DOUBLE()).bridgedTo(double[].class)),
+                DataTypes.FIELD("totalCount", DataTypes.BIGINT()),
+                DataTypes.FIELD(
+                        "valueCount",
+                        MapView.newMapViewDataType(DataTypes.DOUBLE(), 
DataTypes.BIGINT())));
+    }
+
+    @Override
+    public PercentileAccumulator createAccumulator() {
+        final PercentileAccumulator acc = new PercentileAccumulator();
+        acc.percentages = null;
+        acc.totalCount = 0;
+        acc.valueCount = new MapView<>();
+        return acc;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // accumulate methods
+    // 
--------------------------------------------------------------------------------------------
+
+    public void accumulate(PercentileAccumulator acc, @Nullable Object value, 
Double percentage)
+            throws Exception {
+        if (acc.percentages == null) {
+            acc.setPercentages(percentage);
+        }
+        update(acc, value, 1L);
+    }
+
+    public void accumulate(
+            PercentileAccumulator acc,
+            @Nullable Object value,
+            Double percentage,
+            @Nullable Number frequency)
+            throws Exception {
+        if (acc.percentages == null) {
+            acc.setPercentages(percentage);
+        }
+        if (frequency == null || frequency.longValue() <= 0) {
+            return;
+        }
+        update(acc, value, frequency.longValue());
+    }
+
+    public void accumulate(PercentileAccumulator acc, @Nullable Object value, 
Double[] percentage)
+            throws Exception {
+        if (acc.percentages == null) {
+            acc.setPercentages(percentage);
+        }
+        update(acc, value, 1L);
+    }
+
+    public void accumulate(
+            PercentileAccumulator acc,
+            @Nullable Object value,
+            Double[] percentage,
+            @Nullable Number frequency)
+            throws Exception {
+        if (acc.percentages == null) {
+            acc.setPercentages(percentage);
+        }
+        if (frequency == null || frequency.longValue() <= 0) {
+            return;
+        }
+        update(acc, value, frequency.longValue());
+    }
+
+    private void update(PercentileAccumulator acc, @Nullable Object value, 
long frequency)
+            throws Exception {
+        // skip input if value or frequency is null or frequency <= 0

Review Comment:
   nit: remove 'frequency is null or frequency <= 0'  since this was done in 
`accumulate`



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/PercentileAggFunction.java:
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.dataview.MapView;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.commons.math3.util.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in PERCENTILE aggregate function. */
+@Internal
+public abstract class PercentileAggFunction<T>
+        extends BuiltInAggregateFunction<T, 
PercentileAggFunction.PercentileAccumulator> {
+
+    protected final transient DataType valueType;
+    protected final transient DataType frequencyType;
+
+    public PercentileAggFunction(LogicalType inputType, LogicalType 
frequencyType) {
+        this.valueType = toInternalDataType(inputType);
+        this.frequencyType = frequencyType == null ? null : 
toInternalDataType(frequencyType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Accumulator
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Accumulator for PERCENTILE. */
+    public static class PercentileAccumulator {
+
+        public double[] percentages;
+        public long totalCount;
+        public MapView<Double, Long> valueCount;
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PercentileAccumulator that = (PercentileAccumulator) o;
+            return Arrays.equals(percentages, that.percentages)
+                    && totalCount == that.totalCount
+                    && Objects.equals(valueCount, that.valueCount);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(Arrays.hashCode(percentages), totalCount, 
valueCount.hashCode());
+        }
+
+        public Double[] getValue() {
+            // calculate the position for each percentage and sort it, so that 
all percentiles can
+            // be obtained with a single traversal
+            List<Pair<Double, Integer>> sortedPercentages = new ArrayList<>();
+            for (int index = 0; index < percentages.length; index++) {
+                sortedPercentages.add(new Pair<>(percentages[index] * 
(totalCount - 1) + 1, index));
+            }
+            sortedPercentages.sort(Comparator.comparing(Pair::getKey));
+
+            // get a sorted list from hashmap for single traversal
+            List<Map.Entry<Double, Long>> sortedList = new ArrayList<>();
+            try {
+                for (Map.Entry<Double, Long> entry : valueCount.entries()) {
+                    sortedList.add(entry);
+                }
+            } catch (Exception e) {
+                throw new FlinkRuntimeException(e);
+            }
+            sortedList.sort(Map.Entry.comparingByKey());
+
+            Double[] percentiles = new Double[percentages.length];
+
+            long preCnt = sortedList.get(0).getValue();
+            for (int i = 0, j = 0; i < sortedPercentages.size(); i++) {
+                Pair<Double, Integer> entry = sortedPercentages.get(i);
+                double position = entry.getKey();
+                long lower = (long) Math.floor(position);
+                long higher = (long) Math.ceil(position);
+
+                // lower <= (totalCount - 1) * percentage + 1 <= totalCount

Review Comment:
   This holds true when state ttl is not set.
   However, once ttl is set, the `totalCount` and `valueCount` states in acc 
are not atomic, and there may be corner cases where `totalCount` still exists 
but the `valueCount` state has expired, in which case the `totalCount` value 
may be much larger than the actual state of `valueCount`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to