AlexanderSaydakov commented on code in PR #511: URL: https://github.com/apache/datasketches-java/pull/511#discussion_r1513596392
########## src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java: ########## @@ -0,0 +1,636 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.tdigest; + +import java.nio.ByteOrder; +import java.util.function.Function; + +import org.apache.datasketches.common.Family; +import org.apache.datasketches.common.SketchesArgumentException; +import org.apache.datasketches.common.SketchesStateException; +import org.apache.datasketches.memory.Buffer; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableBuffer; +import org.apache.datasketches.memory.WritableMemory; +import org.apache.datasketches.quantilescommon.QuantilesAPI; + +/** + * t-Digest for estimating quantiles and ranks. + * This implementation is based on the following paper: + * Ted Dunning, Otmar Ertl. Extremely Accurate Quantiles Using t-Digests + * and the following implementation: + * https://github.com/tdunning/t-digest + * This implementation is similar to MergingDigest in the above implementation + */ +public final class TDigestDouble { + + public static final boolean USE_ALTERNATING_SORT = true; + public static final boolean USE_TWO_LEVEL_COMPRESSION = true; + public static final boolean USE_WEIGHT_LIMIT = true; + + public static final short DEFAULT_K = 200; + + private boolean reverseMerge_; + private final short k_; + private final short internalK_; + private double minValue_; + private double maxValue_; + private int centroidsCapacity_; + private int numCentroids_; + private double[] centroidMeans_; + private long[] centroidWeights_; + private long centroidsWeight_; + private int bufferCapacity_; + private int numBuffered_; + private double[] bufferValues_; + private long[] bufferWeights_; + private long bufferedWeight_; + + private static final byte PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1; + private static final byte PREAMBLE_LONGS_MULTIPLE = 2; + private static final byte SERIAL_VERSION = 1; + + private static final int COMPAT_DOUBLE = 1; + private static final int COMPAT_FLOAT = 2; + + enum flags { IS_EMPTY, IS_SINGLE_VALUE, REVERSE_MERGE }; + + /** + * Constructor with the default K + */ + public TDigestDouble() { + this(DEFAULT_K); + } + + /** + * Constructor + * @param k affects the size of TDigest and its estimation error + */ + public TDigestDouble(final short k) { + this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0); + } + + /** + * @return parameter k (compression) that was used to configure this TDigest + */ + public short getK() { + return k_; + } + + /** + * Update this TDigest with the given value + * @param value to update the TDigest with + */ + public void update(final double value) { + if (Double.isNaN(value)) return; + if (numBuffered_ == bufferCapacity_ - numCentroids_) mergeBuffered(); + bufferValues_[numBuffered_] = value; + bufferWeights_[numBuffered_] = 1; + numBuffered_++; + bufferedWeight_++; + minValue_ = Math.min(minValue_, value); + maxValue_ = Math.max(maxValue_, value); + } + + /** + * Merge the given TDigest into this one + * @param other TDigest to merge + */ + public void merge(final TDigestDouble other) { + if (other.isEmpty()) return; + final int num = numCentroids_ + numBuffered_ + other.numCentroids_ + other.numBuffered_; + if (num <= bufferCapacity_) { + System.arraycopy(other.bufferValues_, 0, bufferValues_, numBuffered_, other.numBuffered_); + System.arraycopy(other.bufferWeights_, 0, bufferWeights_, numBuffered_, other.numBuffered_); + numBuffered_ += other.numBuffered_; + System.arraycopy(other.centroidMeans_, 0, bufferValues_, numBuffered_, other.numCentroids_); + System.arraycopy(other.centroidWeights_, 0, bufferWeights_, numBuffered_, other.numCentroids_); + numBuffered_ += other.numCentroids_; + bufferedWeight_ += other.getTotalWeight(); + minValue_ = Math.min(minValue_, other.minValue_); + maxValue_ = Math.max(maxValue_, other.maxValue_); + } else { + final double[] values = new double[num]; + final long[] weights = new long[num]; + System.arraycopy(bufferValues_, 0, values, 0, numBuffered_); + System.arraycopy(bufferWeights_, 0, weights, 0, numBuffered_); + System.arraycopy(other.bufferValues_, 0, values, numBuffered_, other.numBuffered_); + System.arraycopy(other.bufferWeights_, 0, weights, numBuffered_, other.numBuffered_); + numBuffered_ += other.numBuffered_; + System.arraycopy(other.centroidMeans_, 0, values, numBuffered_, other.numCentroids_); + System.arraycopy(other.centroidWeights_, 0, weights, numBuffered_, other.numCentroids_); + numBuffered_ += other.numCentroids_; + merge(values, weights, bufferedWeight_ + other.getTotalWeight(), numBuffered_); + } + } + + /** + * Process buffered values and merge centroids if needed + */ + public void compress() { + mergeBuffered(); + } + + /** + * @return true if TDigest has not seen any data + */ + public boolean isEmpty() { + return numCentroids_ == 0 && numBuffered_ == 0; + } + + /** + * @return minimum value seen by TDigest + */ + public double getMinValue() { + if (isEmpty()) { throw new SketchesStateException(QuantilesAPI.EMPTY_MSG); } + return minValue_; + } + + /** + * @return maximum value seen by TDigest + */ + public double getMaxValue() { + if (isEmpty()) { throw new SketchesStateException(QuantilesAPI.EMPTY_MSG); } + return maxValue_; + } + + /** + * @return total weight + */ + public long getTotalWeight() { + return centroidsWeight_ + bufferedWeight_; + } + + /** + * Compute approximate normalized rank of the given value. + * @param value to be ranked + * @return normalized rank (from 0 to 1 inclusive) + */ + public double getRank(final double value) { + if (isEmpty()) throw new SketchesStateException(QuantilesAPI.EMPTY_MSG); + if (Double.isNaN(value)) throw new SketchesArgumentException("Operation is undefined for Nan"); + if (value < minValue_) return 0; + if (value > maxValue_) return 1; + if (numCentroids_ + numBuffered_ == 1) return 0.5; + + mergeBuffered(); // side effect + + // left tail + final double firstMean = centroidMeans_[0]; + if (value < firstMean) { + if (firstMean - minValue_ > 0) { + if (value == minValue_) return 0.5 / centroidsWeight_; + return (1.0 + (value - minValue_) / (firstMean - minValue_) * (centroidWeights_[0] / 2.0 - 1.0)); + } + return 0; // should never happen + } + + // right tail + final double lastMean = centroidMeans_[numCentroids_ - 1]; + if (value > lastMean) { + if (maxValue_ - lastMean > 0) { + if (value == maxValue_) return 1.0 - 0.5 / centroidsWeight_; + return 1.0 - ((1.0 + (maxValue_ - value) / (maxValue_ - lastMean) * (centroidWeights_[numCentroids_ - 1] / 2.0 - 1.0)) / centroidsWeight_); + } + return 1; // should never happen + } + + int lower = BinarySearch.lowerBound(centroidMeans_, 0, numCentroids_, value); + if (lower == numCentroids_) throw new SketchesStateException("lower == end in getRank()"); + int upper = BinarySearch.upperBound(centroidMeans_, lower, numCentroids_, value); + if (upper == 0) throw new SketchesStateException("upper == begin in getRank()"); + if (value < centroidMeans_[lower]) lower--; + if (upper == numCentroids_ || !(centroidMeans_[upper - 1] < value)) upper--; + + double weightBelow = 0; + int i = 0; + while (i != lower) weightBelow += centroidWeights_[i++]; + weightBelow += centroidWeights_[lower] / 2.0; + + double weightDelta = 0; + while (i != upper) weightDelta += centroidWeights_[i++]; + weightDelta -= centroidWeights_[lower] / 2.0; + weightDelta += centroidWeights_[upper] / 2.0; + if (centroidMeans_[upper] - centroidMeans_[lower] > 0) { + return (weightBelow + weightDelta * (value - centroidMeans_[lower]) / (centroidMeans_[upper] - centroidMeans_[lower])) / centroidsWeight_; + } + return (weightBelow + weightDelta / 2.0) / centroidsWeight_; + } + + /** + * Compute approximate quantile value corresponding to the given normalized rank + * @param rank normalized rank (from 0 to 1 inclusive) + * @return quantile value corresponding to the given rank + */ + public double getQuantile(final double rank) { + if (isEmpty()) throw new SketchesStateException(QuantilesAPI.EMPTY_MSG); + if (Double.isNaN(rank)) throw new SketchesArgumentException("Operation is undefined for Nan"); + if (rank < 0 || rank > 1) throw new SketchesArgumentException("Normalized rank must be within [0, 1]"); + + mergeBuffered(); // side effect + + if (numCentroids_ == 1) return centroidMeans_[0]; + + // at least 2 centroids + final double weight = rank * centroidsWeight_; + if (weight < 1) return minValue_; + if (weight > centroidsWeight_ - 1.0) return maxValue_; + final double firstWeight = centroidWeights_[0]; + if (firstWeight > 1 && weight < firstWeight / 2.0) { + return minValue_ + (weight - 1.0) / (firstWeight / 2.0 - 1.0) * (centroidMeans_[0] - minValue_); + } + final double lastWeight = centroidWeights_[numCentroids_ - 1]; + if (lastWeight > 1 && centroidsWeight_ - weight <= lastWeight / 2.0) { + return maxValue_ + (centroidsWeight_ - weight - 1.0) / (lastWeight / 2.0 - 1.0) * (maxValue_ - centroidMeans_[numCentroids_ - 1]); + } + + // interpolate between extremes + double weightSoFar = firstWeight / 2.0; + for (int i = 0; i < numCentroids_ - 1; i++) { + final double dw = (centroidWeights_[i] + centroidWeights_[i + 1]) / 2.0; + if (weightSoFar + dw > weight) { + // the target weight is between centroids i and i+1 + double leftWeight = 0; + if (centroidWeights_[i] == 1) { + if (weight - weightSoFar < 0.5) return centroidMeans_[i]; + leftWeight = 0.5; + } + double rightWeight = 0; + if (centroidWeights_[i + 1] == 1) { + if (weightSoFar + dw - weight <= 0.5) return centroidMeans_[i + 1]; + rightWeight = 0.5; + } + final double w1 = weight - weightSoFar - leftWeight; + final double w2 = weightSoFar + dw - weight - rightWeight; + return weightedAverage(centroidMeans_[i], w1, centroidMeans_[i + 1], w2); + } + weightSoFar += dw; + } + final double w1 = weight - centroidsWeight_ - centroidWeights_[numCentroids_ - 1] / 2.0; + final double w2 = centroidWeights_[numCentroids_ - 1] / 2.0 - w1; + return weightedAverage(centroidWeights_[numCentroids_ - 1], w1, maxValue_, w2); + } + + /** + * Serialize this TDigest to a byte array form. + * @return byte array + */ + public byte[] toByteArray() { + mergeBuffered(); // side effect + final byte preambleLongs = isEmpty() || isSingleValue() ? PREAMBLE_LONGS_EMPTY_OR_SINGLE : PREAMBLE_LONGS_MULTIPLE; + int sizeBytes = preambleLongs * Long.BYTES + + (isEmpty() ? 0 : (isSingleValue() ? Double.BYTES : 2 * Double.BYTES + (Double.BYTES + Long.BYTES) * numCentroids_)); + final byte[] bytes = new byte[sizeBytes]; + final WritableBuffer wbuf = WritableMemory.writableWrap(bytes).asWritableBuffer(); + wbuf.putByte(preambleLongs); + wbuf.putByte(SERIAL_VERSION); + wbuf.putByte((byte) Family.TDIGEST.getID()); + wbuf.putShort(k_); + wbuf.putByte((byte) ( + (isEmpty() ? 1 << flags.IS_EMPTY.ordinal() : 0) | Review Comment: Not sure. We rely on implicit values all over the place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
