zhipeng93 commented on code in PR #162: URL: https://github.com/apache/flink-ml/pull/162#discussion_r994557150
########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; Review Comment: Could you add explanations for the class variable here for better readability? ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { + if (relativeError <= 0 || relativeError >= 1) { + throw new RuntimeException("An appropriate relative error must lay between 0 and 1."); Review Comment: Is IllegalArgumentException better here? ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; Review Comment: Should we also let `COMPRESS_THRESHOLD` a non-final value and can be set by users? Since it will also be used by LSH. [1] [1] https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala#L143 ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; Review Comment: How is the default value for `BUFFER_SIZE` chosen? It seems that the corresponding value in Spark is 5000. [1] By the way, `defaultHeadSize` seems a better name for this. [1] https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala#L338 ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { + if (relativeError <= 0 || relativeError >= 1) { + throw new RuntimeException("An appropriate relative error must lay between 0 and 1."); + } + this.relativeError = relativeError; + this.sampled = sampled; + this.count = count; + } + + /** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ + public QuantileSummary insert(Double item) { + headBuffer.add(item); + if (headBuffer.size() >= BUFFER_SIZE) { + QuantileSummary result = insertHeadBuffer(); + if (result.sampled.size() >= COMPRESS_THRESHOLD) { + return result.compress(); + } else { + return result; + } + } else { + return this; + } + } + + /** + * This implements the COMPRESS function of the GK algorithm. + * + * @return A new summary that compresses the summary statistics and the head buffer. + */ + public QuantileSummary compress() { + QuantileSummary inserted = insertHeadBuffer(); + Preconditions.checkState(inserted.headBuffer.isEmpty()); + Preconditions.checkState(inserted.count == count + headBuffer.size()); + + List<StatsTuple> compressed = + compressInternal(inserted.sampled, 2 * relativeError * inserted.count); + return new QuantileSummary(relativeError, compressed, inserted.count); + } + + /** + * * Merges two compressed summaries together. + * + * @param other The summary to be merged. + * @return The merged summary. + */ + public QuantileSummary merge(QuantileSummary other) { + checkState(headBuffer.isEmpty(), "Current buffer needs to be compressed before merge"); Review Comment: nit: The usage of `checkState` and `Preconditions.checkState` seems inconsistent in this PR. ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { Review Comment: Should we mark it as Internal or do we expect end users to use this class? ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { + if (relativeError <= 0 || relativeError >= 1) { + throw new RuntimeException("An appropriate relative error must lay between 0 and 1."); + } + this.relativeError = relativeError; + this.sampled = sampled; + this.count = count; + } + + /** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ + public QuantileSummary insert(Double item) { + headBuffer.add(item); + if (headBuffer.size() >= BUFFER_SIZE) { + QuantileSummary result = insertHeadBuffer(); + if (result.sampled.size() >= COMPRESS_THRESHOLD) { + return result.compress(); + } else { + return result; + } + } else { + return this; + } + } + + /** + * This implements the COMPRESS function of the GK algorithm. + * + * @return A new summary that compresses the summary statistics and the head buffer. + */ + public QuantileSummary compress() { + QuantileSummary inserted = insertHeadBuffer(); + Preconditions.checkState(inserted.headBuffer.isEmpty()); + Preconditions.checkState(inserted.count == count + headBuffer.size()); + + List<StatsTuple> compressed = + compressInternal(inserted.sampled, 2 * relativeError * inserted.count); + return new QuantileSummary(relativeError, compressed, inserted.count); + } + + /** + * * Merges two compressed summaries together. + * + * @param other The summary to be merged. + * @return The merged summary. + */ + public QuantileSummary merge(QuantileSummary other) { + checkState(headBuffer.isEmpty(), "Current buffer needs to be compressed before merge"); + checkState(other.headBuffer.isEmpty(), "Other buffer needs to be compressed before merge"); + + if (other.count == 0) { + return shallowCopy(); + } else if (count == 0) { + return other.shallowCopy(); + } else { + List<StatsTuple> mergedSampled = new ArrayList<>(); + double mergedRelativeError = Math.max(relativeError, other.relativeError); + long mergedCount = count + other.count; + long additionalSelfDelta = + new Double(Math.floor(2 * other.relativeError * other.count)).longValue(); + long additionalOtherDelta = + new Double(Math.floor(2 * relativeError * count)).longValue(); + + int selfIdx = 0; + int otherIdx = 0; + while (selfIdx < sampled.size() && otherIdx < other.sampled.size()) { + StatsTuple selfSample = sampled.get(selfIdx); + StatsTuple otherSample = other.sampled.get(otherIdx); + StatsTuple nextSample; + long additionalDelta = 0; + if (selfSample.value < otherSample.value) { + nextSample = selfSample; + if (otherIdx > 0) { + additionalDelta = additionalSelfDelta; + } + selfIdx++; + } else { + nextSample = otherSample; + if (selfIdx > 0) { + additionalDelta = additionalOtherDelta; + } + otherIdx++; + } + nextSample = nextSample.shallowCopy(); + nextSample.delta = nextSample.delta + additionalDelta; + mergedSampled.add(nextSample); + } + IntStream.range(selfIdx, sampled.size()) + .forEach(i -> mergedSampled.add(sampled.get(i))); + IntStream.range(otherIdx, other.sampled.size()) + .forEach(i -> mergedSampled.add(other.sampled.get(i))); + + List<StatsTuple> comp = + compressInternal(mergedSampled, 2 * mergedRelativeError * mergedCount); + return new QuantileSummary(mergedRelativeError, comp, mergedCount); + } + } + + /** + * Runs a query for a given percentile. The query can only be run on a compressed summary, you + * need to call compress() before using it. + * + * @param percentile The target percentile. + * @return The corresponding approximate quantile. + */ + public double query(double percentile) { + double[] percentiles = {percentile}; + return query(percentiles)[0]; + } + + /** + * * Runs a query for a given sequence of percentiles. The query can only be run on a compressed + * summary, you need to call compress() before using it. + * + * @param percentiles A list of the target percentiles. + * @return A list of the corresponding approximate quantiles, in the same order as the input. + */ + public double[] query(double[] percentiles) { + Arrays.stream(percentiles) + .forEach( + x -> + checkState( + x >= 0 && x <= 1.0, + "percentile should be in the range [0.0, 1.0].")); + checkState( + headBuffer.isEmpty(), + "Cannot operate on an uncompressed summary, call compress() first"); + if (sampled == null || sampled.isEmpty()) { + return null; + } + double targetError = Long.MIN_VALUE; + for (StatsTuple tuple : sampled) { + targetError = Math.max(targetError, (tuple.delta + tuple.g)); + } + targetError = targetError / 2; + Map<Double, Integer> zipWithIndex = new HashMap<>(percentiles.length); + IntStream.range(0, percentiles.length).forEach(i -> zipWithIndex.put(percentiles[i], i)); + + int index = 0; + long minRank = sampled.get(0).g; + double[] sorted = Arrays.stream(percentiles).sorted().toArray(); + double[] result = new double[percentiles.length]; + + for (int i = 0; i < sorted.length; i++) { + int percentileIndex = zipWithIndex.get(sorted[i]); + if (sorted[i] <= relativeError) { + result[percentileIndex] = sampled.get(0).value; + } else if (sorted[i] >= 1 - relativeError) { + result[percentileIndex] = sampled.get(sampled.size() - 1).value; + } else { + QueryResult queryResult = + findApproximateQuantile(index, minRank, targetError, sorted[i]); + index = queryResult.index; + minRank = queryResult.minRankAtIndex; + result[percentileIndex] = queryResult.percentile; + } + } + return result; + } + + private QuantileSummary insertHeadBuffer() { + if (headBuffer.isEmpty()) { + return this; + } + + long newCount = count; + List<StatsTuple> newSamples = new ArrayList<>(); + List<Double> sorted = headBuffer.stream().sorted().collect(Collectors.toList()); + + int cursor = 0; + for (int i = 0; i < sorted.size(); i++) { + while (cursor < sampled.size() && sampled.get(cursor).value <= sorted.get(i)) { + newSamples.add(sampled.get(cursor)); + cursor++; + } + + long delta = new Double(Math.floor(2.0 * relativeError * count)).longValue(); + if (newSamples.isEmpty() || (cursor == sampled.size() && i == sorted.size() - 1)) { + delta = 0; + } + StatsTuple tuple = new StatsTuple(sorted.get(i), 1L, delta); + newSamples.add(tuple); + newCount++; + } + + for (int i = cursor; i < sampled.size(); i++) { + newSamples.add(sampled.get(i)); + } + return new QuantileSummary(relativeError, newSamples, newCount); + } + + private List<StatsTuple> compressInternal( + List<StatsTuple> currentSamples, double mergeThreshold) { + if (currentSamples.isEmpty()) { + return Collections.emptyList(); + } + LinkedList<StatsTuple> result = new LinkedList<>(); + + StatsTuple head = currentSamples.get(currentSamples.size() - 1); + for (int i = currentSamples.size() - 2; i >= 1; i--) { + StatsTuple tuple = currentSamples.get(i); + if (tuple.g + head.g + head.delta < mergeThreshold) { + head = head.shallowCopy(); + head.g = head.g + tuple.g; + } else { + result.addFirst(head); + head = tuple; + } + } + result.addFirst(head); + + StatsTuple currHead = currentSamples.get(0); + if (currHead.value <= head.value && currentSamples.size() > 1) { + result.addFirst(currHead); + } + return new ArrayList<>(result); + } + + private QueryResult findApproximateQuantile( + int index, long minRankAtIndex, double targetError, double percentile) { + StatsTuple curSample = sampled.get(index); + long rank = new Double(Math.ceil(percentile * count)).longValue(); + long minRank = minRankAtIndex; + + for (int i = index; i < sampled.size() - 1; ) { + long maxRank = minRank + curSample.delta; + if (maxRank - targetError < rank && rank <= minRank + targetError) { + return new QueryResult(i, minRank, curSample.value); + } else { + curSample = sampled.get(++i); + minRank += curSample.g; + } + } + return new QueryResult(sampled.size() - 1, 0, sampled.get(sampled.size() - 1).value); + } + + public double getRelativeError() { + return relativeError; + } + + public void setRelativeError(double relativeError) { + this.relativeError = relativeError; + } + + private QuantileSummary shallowCopy() { + return new QuantileSummary(relativeError, sampled, count); + } + + /** Wrapper class to hold all information returned after querying. */ + private class QueryResult { Review Comment: Could this be a static inner class? Same for `StatsTuple`. ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { Review Comment: It seems that this method not necessary for LSH or RobustScaler. Can we make it private for now? ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { Review Comment: As from LSH and RobustScaler, the constructor needs to take both `relativeError` and `compressThreshold` as input parameters. ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. Review Comment: nit: The comment is in-consistent with the class name. ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { + if (relativeError <= 0 || relativeError >= 1) { + throw new RuntimeException("An appropriate relative error must lay between 0 and 1."); + } + this.relativeError = relativeError; + this.sampled = sampled; + this.count = count; + } + + /** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ + public QuantileSummary insert(Double item) { + headBuffer.add(item); + if (headBuffer.size() >= BUFFER_SIZE) { + QuantileSummary result = insertHeadBuffer(); + if (result.sampled.size() >= COMPRESS_THRESHOLD) { + return result.compress(); + } else { + return result; + } + } else { + return this; + } + } + + /** + * This implements the COMPRESS function of the GK algorithm. + * + * @return A new summary that compresses the summary statistics and the head buffer. + */ + public QuantileSummary compress() { + QuantileSummary inserted = insertHeadBuffer(); + Preconditions.checkState(inserted.headBuffer.isEmpty()); + Preconditions.checkState(inserted.count == count + headBuffer.size()); + + List<StatsTuple> compressed = + compressInternal(inserted.sampled, 2 * relativeError * inserted.count); + return new QuantileSummary(relativeError, compressed, inserted.count); + } + + /** + * * Merges two compressed summaries together. + * + * @param other The summary to be merged. + * @return The merged summary. + */ + public QuantileSummary merge(QuantileSummary other) { + checkState(headBuffer.isEmpty(), "Current buffer needs to be compressed before merge"); + checkState(other.headBuffer.isEmpty(), "Other buffer needs to be compressed before merge"); + + if (other.count == 0) { + return shallowCopy(); + } else if (count == 0) { + return other.shallowCopy(); + } else { + List<StatsTuple> mergedSampled = new ArrayList<>(); + double mergedRelativeError = Math.max(relativeError, other.relativeError); + long mergedCount = count + other.count; + long additionalSelfDelta = + new Double(Math.floor(2 * other.relativeError * other.count)).longValue(); + long additionalOtherDelta = + new Double(Math.floor(2 * relativeError * count)).longValue(); + + int selfIdx = 0; + int otherIdx = 0; + while (selfIdx < sampled.size() && otherIdx < other.sampled.size()) { + StatsTuple selfSample = sampled.get(selfIdx); + StatsTuple otherSample = other.sampled.get(otherIdx); + StatsTuple nextSample; + long additionalDelta = 0; + if (selfSample.value < otherSample.value) { + nextSample = selfSample; + if (otherIdx > 0) { + additionalDelta = additionalSelfDelta; + } + selfIdx++; + } else { + nextSample = otherSample; + if (selfIdx > 0) { + additionalDelta = additionalOtherDelta; + } + otherIdx++; + } + nextSample = nextSample.shallowCopy(); + nextSample.delta = nextSample.delta + additionalDelta; + mergedSampled.add(nextSample); + } + IntStream.range(selfIdx, sampled.size()) + .forEach(i -> mergedSampled.add(sampled.get(i))); + IntStream.range(otherIdx, other.sampled.size()) + .forEach(i -> mergedSampled.add(other.sampled.get(i))); + + List<StatsTuple> comp = + compressInternal(mergedSampled, 2 * mergedRelativeError * mergedCount); + return new QuantileSummary(mergedRelativeError, comp, mergedCount); + } + } + + /** + * Runs a query for a given percentile. The query can only be run on a compressed summary, you + * need to call compress() before using it. + * + * @param percentile The target percentile. + * @return The corresponding approximate quantile. + */ + public double query(double percentile) { + double[] percentiles = {percentile}; + return query(percentiles)[0]; Review Comment: If the `sample` is null, it may throw NPE here. ########## flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java: ########## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.util; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Helper class to compute an approximate quantile summary. This implementation is based on the + * algorithm proposed in the paper: "Space-efficient Online Computation of Quantile Summaries" by + * Greenwald, Michael and Khanna, Sanjeev. (https://doi.org/10.1145/375663.375670) + */ +public class QuantileSummary { + + private static final int BUFFER_SIZE = 10000; + private static final int COMPRESS_THRESHOLD = 10000; + + private double relativeError; + private long count; + private List<StatsTuple> sampled; + private List<Double> headBuffer = new ArrayList<>(BUFFER_SIZE); + + /** + * * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + */ + public QuantileSummary(double relativeError) { + this(relativeError, Collections.EMPTY_LIST, 0); + } + + /** + * QuantileSummaries Constructor. + * + * @param relativeError The target relative error. + * @param sampled A buffer of quantile statistics. See the G-K article for more details. + * @param count The count of all the elements inserted in the sampled buffer. + */ + public QuantileSummary(double relativeError, List<StatsTuple> sampled, long count) { + if (relativeError <= 0 || relativeError >= 1) { + throw new RuntimeException("An appropriate relative error must lay between 0 and 1."); + } + this.relativeError = relativeError; + this.sampled = sampled; + this.count = count; + } + + /** + * Insert a new observation to the summary. + * + * @param item The new observation to insert into the summary. + * @return A summary with the given observation inserted into the summary. + */ + public QuantileSummary insert(Double item) { + headBuffer.add(item); + if (headBuffer.size() >= BUFFER_SIZE) { + QuantileSummary result = insertHeadBuffer(); + if (result.sampled.size() >= COMPRESS_THRESHOLD) { + return result.compress(); + } else { + return result; + } + } else { + return this; + } + } + + /** + * This implements the COMPRESS function of the GK algorithm. + * + * @return A new summary that compresses the summary statistics and the head buffer. + */ + public QuantileSummary compress() { + QuantileSummary inserted = insertHeadBuffer(); + Preconditions.checkState(inserted.headBuffer.isEmpty()); + Preconditions.checkState(inserted.count == count + headBuffer.size()); Review Comment: Is `Preconditions.checkState(inserted.count == count)` enough since headBuffer is empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
