This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch tdigest in repository https://gitbox.apache.org/repos/asf/datasketches-java.git
commit 64337f645d461078b67c582b0ef58a5e44e7a90e Author: AlexanderSaydakov <[email protected]> AuthorDate: Mon Feb 12 16:03:12 2024 -0800 incomplete implementation --- .../java/org/apache/datasketches/tdigest/Sort.java | 153 +++++++++++ .../org/apache/datasketches/tdigest/TDigest.java | 287 +++++++++++++++++++++ .../apache/datasketches/tdigest/package-info.java | 23 ++ .../org/apache/datasketches/tdigest/SortTest.java | 61 +++++ .../apache/datasketches/tdigest/TDigestTest.java | 66 +++++ 5 files changed, 590 insertions(+) diff --git a/src/main/java/org/apache/datasketches/tdigest/Sort.java b/src/main/java/org/apache/datasketches/tdigest/Sort.java new file mode 100644 index 00000000..60146dc1 --- /dev/null +++ b/src/main/java/org/apache/datasketches/tdigest/Sort.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.tdigest; + +import java.util.Random; + +public final class Sort { + private static final Random prng = new Random(); + + public static void stableSort(final double[] keys, final long[] values, final int n) { + stableLimitedQuickSort(keys, values, 0, n, 64); + stableLimitedInsertionSort(keys, values, 0, n, 64); + } + + private static void stableLimitedQuickSort(final double[] keys, final long[] values, int start, int end, final int limit) { + // the while loop implements tail-recursion to avoid excessive stack calls on nasty cases + while (end - start > limit) { + + final int pivotIndex = start + prng.nextInt(end - start); + double pivotValue = keys[pivotIndex]; + + // move pivot to beginning of array + swap(keys, start, pivotIndex); + swap(values, start, pivotIndex); + + // use a three way partition because many duplicate values is an important case + int low = start + 1; // low points to first value not known to be equal to pivotValue + int high = end; // high points to first value > pivotValue + int i = low; // i scans the array + while (i < high) { + // invariant: values[k] == pivotValue for k in [0..low) + // invariant: values[k] < pivotValue for k in [low..i) + // invariant: values[k] > pivotValue for k in [high..end) + // in-loop: i < high + // in-loop: low < high + // in-loop: i >= low + final double vi = keys[i]; + if (vi == pivotValue && i == pivotIndex) { + if (low != i) { + swap(keys, low, i); + swap(values, low, i); + } else { + i++; + } + low++; + } else if (vi > pivotValue || (vi == pivotValue && i > pivotIndex)) { + high--; + swap(keys, i, high); + swap(values, i, high); + } else { + i++; + } + } + // assert i == high || low == high therefore, we are done with partition + // at this point, i==high, from [start,low) are == pivot, [low,high) are < and [high,end) are > + // we have to move the values equal to the pivot into the middle. To do this, we swap pivot + // values into the top end of the [low,high) range stopping when we run out of destinations + // or when we run out of values to copy + int from = start; + int to = high - 1; + for (i = 0; from < low && to >= low; i++) { + swap(keys, from, to); + swap(values, from++, to--); + } + if (from == low) { + // ran out of things to copy. This means that the last destination is the boundary + low = to + 1; + } else { + // ran out of places to copy to. This means that there are uncopied pivots and the + // boundary is at the beginning of those + low = from; + } + + // now recurse, but arrange it to handle the longer limit by tail recursion + // we have to sort the pivot values because they may have different weights + // we can't do that, however until we know how much weight is in the left and right + if (low - start < end - high) { + // left side is smaller + stableLimitedQuickSort(keys, values, start, low, limit); + // this is really a way to do + // quickSort(keys, values, high, end, limit); + start = high; + } else { + stableLimitedQuickSort(keys, values, high, end, limit); + // this is really a way to do + // quickSort(keys, values, start, low, limit); + end = low; + } + } + } + + private static void stableLimitedInsertionSort(final double[] keys, final long[] values, int from, int to, final int limit) { + for (int i = from + 1; i < to; i++) { + final double k = keys[i]; + final long v = values[i]; + final int m = Math.max(i - limit, from); + // values in [from, i) are ordered + // scan backwards to find where to stick the current key + for (int j = i; j >= m; j--) { + if (j == 0 || keys[j - 1] < k || (keys[j - 1] == k && (j - 1 <= i))) { + if (j < i) { + System.arraycopy(keys, j, keys, j + 1, i - j); + System.arraycopy(values, j, values, j + 1, i - j); + keys[j] = k; + values[j] = v; + } + break; + } + } + } + } + + private static void swap(final double[] values, final int i, final int j) { + final double tmpValue = values[i]; + values[i] = values[j]; + values[j] = tmpValue; + } + + private static void swap(final long[] values, final int i, final int j) { + final long tmpValue = values[i]; + values[i] = values[j]; + values[j] = tmpValue; + } + + public static void reverse(final double[] values, final int n) { + for (int i = 0; i < n / 2; i++) { + swap(values, i, n - i - 1); + } + } + + public static void reverse(final long[] values, final int n) { + for (int i = 0; i < n / 2; i++) { + swap(values, i, n - i - 1); + } + } +} diff --git a/src/main/java/org/apache/datasketches/tdigest/TDigest.java b/src/main/java/org/apache/datasketches/tdigest/TDigest.java new file mode 100644 index 00000000..95fb6ee2 --- /dev/null +++ b/src/main/java/org/apache/datasketches/tdigest/TDigest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.tdigest; + +import java.util.function.Function; + +import org.apache.datasketches.quantilescommon.QuantilesAPI; + +/** + * t-Digest for estimating quantiles and ranks. + * This implementation is based on the following paper: + * Ted Dunning, Otmar Ertl. Extremely Accurate Quantiles Using t-Digests + * and the following implementation: + * https://github.com/tdunning/t-digest + * This implementation is similar to MergingDigest in the above implementation + */ +public final class TDigest { + + public static final boolean USE_ALTERNATING_SORT = true; + public static final boolean USE_TWO_LEVEL_COMPRESSION = true; + public static final boolean USE_WEIGHT_LIMIT = true; + + private boolean reverseMerge_; + private final int k_; + private final int internalK_; + private double minValue_; + private double maxValue_; + private int centroidsCapacity_; + private int numCentroids_; + private double[] centroidMeans_; + private long[] centroidWeights_; + private long totalWeight_; + private int bufferCapacity_; + private int numBuffered_; + private double[] bufferValues_; + private long[] bufferWeights_; + private long bufferedWeight_; + + public TDigest(final int k) { + this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0); + } + + private TDigest(final boolean reverseMerge, final int k, final double min, final double max, + final double[] means, final long[] weights, final long weight) { + reverseMerge_ = reverseMerge; + k_ = k; + minValue_ = min; + maxValue_ = max; + if (k < 10) throw new IllegalArgumentException("k must be at least 10"); + int fudge = 0; + if (USE_WEIGHT_LIMIT) { + fudge = 10; + if (k < 30) fudge += 20; + } + centroidsCapacity_ = k_ * 2 + fudge; + bufferCapacity_ = centroidsCapacity_ * 5; + double scale = Math.max(1.0, (double) bufferCapacity_ / centroidsCapacity_ - 1.0); + if (!USE_TWO_LEVEL_COMPRESSION) scale = 1; + internalK_ = (int) Math.ceil(Math.sqrt(scale) * k_); + centroidsCapacity_ = Math.max(centroidsCapacity_, internalK_ + fudge); + bufferCapacity_ = Math.max(bufferCapacity_, centroidsCapacity_ * 2); + centroidMeans_ = new double[centroidsCapacity_]; + centroidWeights_ = new long[centroidsCapacity_]; + bufferValues_ = new double[bufferCapacity_]; + bufferWeights_ = new long[bufferCapacity_]; + numCentroids_ = 0; + numBuffered_ = 0; + totalWeight_ = 0; + bufferedWeight_ = 0; + } + + public int getK() { + return k_; + } + + public void update(final double value) { + if (Double.isNaN(value)) return; + if (numBuffered_ == bufferCapacity_ - numCentroids_) mergeBuffered(); + bufferValues_[numBuffered_] = value; + bufferWeights_[numBuffered_] = 1; + numBuffered_++; + bufferedWeight_++; + minValue_ = Math.min(minValue_, value); + maxValue_ = Math.max(maxValue_, value); + } + + public void merge(final TDigest other) { + } + + public void compress() { + mergeBuffered(); + } + + public boolean isEmpty() { + return numCentroids_ == 0 && numBuffered_ == 0; + } + + public double getMinValue() { + if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); } + return minValue_; + } + + public double getMaxValue() { + if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); } + return maxValue_; + } + + public long getTotalWeight() { + return totalWeight_ + bufferedWeight_; + } + + public double getRank(final double quantile) { + if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); } + return 0; + } + + public double getQuantile(final double rank) { + if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); } + return 0; + } + + /** + * Serialize this TDigest to a byte array form. + * @return byte array + */ + public byte[] toByteArray() { + return new byte[0]; + } + + /** + * Returns summary information about this TDigest. Used for debugging. + * @return summary of the TDigest + */ + @Override + public String toString() { + return toString(false); + } + + public String toString(boolean printCentroids) { + String str = "MergingDigest\n" + + " Nominal Compression: " + k_ + "\n" + + " Internal Compression: " + internalK_ + "\n" + + " Centroids: " + numCentroids_ + "\n" + + " Buffered: " + numBuffered_ + "\n" + + " Centroids Capacity: " + centroidsCapacity_ + "\n" + + " Buffer Capacity: " + bufferCapacity_ + "\n" + + " Total Weight: " + totalWeight_ + "\n" + + " Unmerged Weight: " + bufferedWeight_ + "\n" + + " Reverse Merge: " + reverseMerge_ + "\n"; + if (!isEmpty()) { + str += " Min: " + minValue_ + "\n" + + " Max: " + maxValue_ + "\n"; + } + if (printCentroids) { + if (numCentroids_ > 0) { + str += "Centroids:\n"; + for (int i = 0; i < numCentroids_; i++) { + str += i + ": " + centroidMeans_[i] + ", " + centroidWeights_[i] + "\n"; + } + } + if (numBuffered_ > 0) { + str += "Buffer:\n"; + for (int i = 0; i < numBuffered_; i++) { + str += i + ": " + bufferValues_[i] + ", " + bufferWeights_[i] + "\n"; + } + } + } + return str; + } + + private void mergeBuffered() { + if (numBuffered_ == 0) return; + final boolean reverse = USE_ALTERNATING_SORT & reverseMerge_; + System.arraycopy(centroidMeans_, 0, bufferValues_, numBuffered_, numCentroids_); + System.arraycopy(centroidWeights_, 0, bufferWeights_, numBuffered_, numCentroids_); + numBuffered_ += numCentroids_; + totalWeight_ += bufferedWeight_; + numCentroids_ = 0; + Sort.stableSort(bufferValues_, bufferWeights_, numBuffered_); + if (reverse) { + Sort.reverse(bufferValues_, numBuffered_); + Sort.reverse(bufferWeights_, numBuffered_); + } + centroidMeans_[0] = bufferValues_[0]; + centroidWeights_[0] = bufferWeights_[0]; + numCentroids_++; + int current = 1; + double weightSoFar = 0; + final double normalizer = ScaleFunction.normalizer(internalK_, totalWeight_); + double k1 = ScaleFunction.k(0, normalizer); + double wLimit = totalWeight_ * ScaleFunction.q(k1 + 1, normalizer); + while (current != numBuffered_) { + final double proposedWeight = centroidWeights_[numCentroids_ - 1] + bufferWeights_[current]; + final boolean addThis; + if (current == 1 || current == numBuffered_ - 1) { + addThis = false; + } else if (USE_WEIGHT_LIMIT) { + final double q0 = weightSoFar / totalWeight_; + final double q2 = (weightSoFar + proposedWeight) / totalWeight_; + addThis = proposedWeight <= totalWeight_ * Math.min(ScaleFunction.max(q0, normalizer), ScaleFunction.max(q2, normalizer)); + } else { + addThis = weightSoFar + proposedWeight <= wLimit; + } + if (addThis) { // merge into existing centroid + centroidWeights_[numCentroids_ - 1] += bufferWeights_[current]; + centroidMeans_[numCentroids_ - 1] += (bufferValues_[current] - centroidMeans_[numCentroids_ - 1]) * bufferWeights_[current] / centroidWeights_[numCentroids_ - 1]; + } else { // copy to a new centroid + weightSoFar += centroidWeights_[numCentroids_ - 1]; + if (!USE_WEIGHT_LIMIT) { + k1 = ScaleFunction.k(weightSoFar / totalWeight_, normalizer); + wLimit = totalWeight_ * ScaleFunction.q(k1 + 1, normalizer); + } + centroidMeans_[numCentroids_] = bufferValues_[current]; + centroidWeights_[numCentroids_] = bufferWeights_[current]; + numCentroids_++; + } + current++; + } + if (reverse) { + Sort.reverse(centroidMeans_, numCentroids_); + Sort.reverse(centroidWeights_, numCentroids_); + } + reverseMerge_ = !reverseMerge_; + numBuffered_ = 0; + bufferedWeight_ = 0; + minValue_ = Math.min(minValue_, centroidMeans_[0]); + maxValue_ = Math.max(maxValue_, centroidMeans_[numCentroids_ - 1]); + } + + /** + * Generates cluster sizes proportional to q*(1-q). + * The use of a normalizing function results in a strictly bounded number of clusters no matter how many samples. + */ + static class ScaleFunction { + static double k(final double q, final double normalizer) { + return limit(new Function<Double, Double>() { + @Override + public Double apply(Double q) { + return Math.log(q / (1 - q)) * normalizer; + } + }, q, 1e-15, 1 - 1e-15); + } + + static double q(final double k, final double normalizer) { + final double w = Math.exp(k / normalizer); + return w / (1 + w); + } + + static double max(final double q, final double normalizer) { + return q * (1 - q) / normalizer; + } + + static double normalizer(final double compression, final double n) { + return compression / z(compression, n); + } + + static double z(final double compression, final double n) { + return 4 * Math.log(n / compression) + 24; + } + + static double limit(final Function<Double, Double> f, final double x, final double low, final double high) { + if (x < low) { + return f.apply(low); + } else if (x > high) { + return f.apply(high); + } + return f.apply(x); + } + } +} diff --git a/src/main/java/org/apache/datasketches/tdigest/package-info.java b/src/main/java/org/apache/datasketches/tdigest/package-info.java new file mode 100644 index 00000000..e0fa8e58 --- /dev/null +++ b/src/main/java/org/apache/datasketches/tdigest/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * t-Digest for estimating quantiles and ranks. + */ +package org.apache.datasketches.tdigest; diff --git a/src/test/java/org/apache/datasketches/tdigest/SortTest.java b/src/test/java/org/apache/datasketches/tdigest/SortTest.java new file mode 100644 index 00000000..86884f0a --- /dev/null +++ b/src/test/java/org/apache/datasketches/tdigest/SortTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.tdigest; + +import static org.testng.Assert.assertEquals; + +import org.apache.datasketches.common.Shuffle; +import org.testng.annotations.Test; + +public class SortTest { + + @Test + public void smallWithRepetition() { + final double[] keys = {3, 1, 4, 2, 1}; + final long[] values = {4, 1, 5, 3, 2}; + Sort.stableSort(keys, values, keys.length); + assertEquals(keys[0], 1); + assertEquals(keys[1], 1); + assertEquals(keys[2], 2); + assertEquals(keys[3], 3); + assertEquals(keys[4], 4); + assertEquals(values[0], 1); + assertEquals(values[1], 2); + assertEquals(values[2], 3); + assertEquals(values[3], 4); + assertEquals(values[4], 5); + } + + @Test + public void large() { + final int n = 1000; + final double[] keys = new double[n]; + final long[] values = new long[n]; + for (int i = 0; i < n; i++) values[i] = i; + Shuffle.shuffle(values); + for (int i = 0; i < n; i++) keys[i] = values[i]; + Sort.stableSort(keys, values, n); + for (int i = 0; i < n; i++) { + assertEquals(keys[i], i); + assertEquals(values[i], i); + } + } + +} diff --git a/src/test/java/org/apache/datasketches/tdigest/TDigestTest.java b/src/test/java/org/apache/datasketches/tdigest/TDigestTest.java new file mode 100644 index 00000000..380dbb01 --- /dev/null +++ b/src/test/java/org/apache/datasketches/tdigest/TDigestTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.tdigest; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + +public class TDigestTest { + + @Test + public void empty() { + final TDigest td = new TDigest(100); + assertTrue(td.isEmpty()); + assertEquals(td.getK(), 100); + assertEquals(td.getTotalWeight(), 0); + assertThrows(IllegalArgumentException.class, () -> td.getMinValue()); + assertThrows(IllegalArgumentException.class, () -> td.getMaxValue()); + } + + @Test + public void oneValue() { + final TDigest td = new TDigest(100); + td.update(1); + assertFalse(td.isEmpty()); + assertEquals(td.getK(), 100); + assertEquals(td.getTotalWeight(), 1); + assertEquals(td.getMinValue(), 1); + assertEquals(td.getMaxValue(), 1); + } + + @Test + public void manyValues() { + final TDigest td = new TDigest(100); + final int n = 10000; + for (int i = 0; i < n; i++) td.update(i); + System.out.println(td.toString(true)); + td.compress(); + System.out.println(td.toString(true)); + assertFalse(td.isEmpty()); + assertEquals(td.getTotalWeight(), n); + assertEquals(td.getMinValue(), 0); + assertEquals(td.getMaxValue(), n - 1); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
