This is an automated email from the ASF dual-hosted git repository. jmalkin pushed a commit to branch revert-499-ebpps in repository https://gitbox.apache.org/repos/asf/datasketches-java.git
commit c1c72ef8885c6cdf5c5b4344c14330b509e3e9e1 Author: Jon Malkin <[email protected]> AuthorDate: Mon Feb 5 23:34:16 2024 -0800 Revert "EB-PPS Sampling sketch" --- .../org/apache/datasketches/common/Family.java | 7 +- .../datasketches/sampling/EbppsItemsSample.java | 309 ------------- .../datasketches/sampling/EbppsItemsSketch.java | 513 --------------------- .../apache/datasketches/sampling/PreambleUtil.java | 134 ++---- .../sampling/ReservoirItemsSketch.java | 8 +- .../datasketches/sampling/ReservoirItemsUnion.java | 8 +- .../sampling/ReservoirLongsSketch.java | 8 +- .../datasketches/sampling/ReservoirLongsUnion.java | 8 +- .../datasketches/sampling/VarOptItemsSketch.java | 8 +- .../datasketches/sampling/VarOptItemsUnion.java | 8 +- .../datasketches/sampling/EbppsSampleTest.java | 148 ------ .../datasketches/sampling/EbppsSketchTest.java | 310 ------------- 12 files changed, 54 insertions(+), 1415 deletions(-) diff --git a/src/main/java/org/apache/datasketches/common/Family.java b/src/main/java/org/apache/datasketches/common/Family.java index cac049e9..cac7722e 100644 --- a/src/main/java/org/apache/datasketches/common/Family.java +++ b/src/main/java/org/apache/datasketches/common/Family.java @@ -146,12 +146,7 @@ public enum Family { /** * CountMin Sketch */ - COUNTMIN(18, "COUNTMIN", 2, 2), - - /** - * Exact and Bounded, Probability Proportional to Size (EBPPS) - */ - EBPPS(19, "EBPPS", 1, 5); + COUNTMIN(18, "COUNTMIN", 2, 2); private static final Map<Integer, Family> lookupID = new HashMap<>(); private static final Map<String, Family> lookupFamName = new HashMap<>(); diff --git a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java deleted file mode 100644 index d85c99ea..00000000 --- a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.datasketches.sampling; - -import static org.apache.datasketches.common.Util.LS; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -import org.apache.datasketches.common.SketchesArgumentException; - -// this is a supporting class used to hold the raw data sample -class EbppsItemsSample<T> { - - private double c_; // Current sample size, including fractional part - private T partialItem_; // a sample item corresponding to a partial weight - private ArrayList<T> data_; // full sample items - - private Random rand_; // ThreadLocalRandom.current() in general - - // basic constructor - EbppsItemsSample(final int reservedSize) { - c_ = 0.0; - data_ = new ArrayList<>(reservedSize); - rand_ = ThreadLocalRandom.current(); - } - - // copy constructor used during merge - EbppsItemsSample(final EbppsItemsSample<T> other) { - c_ = other.c_; - partialItem_ = other.partialItem_; - data_ = new ArrayList<>(other.data_); - rand_ = other.rand_; - } - - // constructor used for deserialization and testing - // does NOT copy the incoming ArrayList since this is an internal - // class's package-private constructor, not something directly - // taking user data - EbppsItemsSample(ArrayList<T> data, T partialItem, final double c) { - if (c < 0.0 || Double.isNaN(c) || Double.isInfinite(c)) - throw new SketchesArgumentException("C must be nonnegative and finite. Found: " + c); - - c_ = c; - partialItem_ = partialItem; - data_ = data; - rand_ = ThreadLocalRandom.current(); - } - - // Used in lieu of a constructor to populate a temporary sample - // with data before immediately merging it. This approach - // avoids excessive object allocation calls. - // rand_ is not set since it is not expected to be used from - // this object - void replaceContent(final T item, final double theta) { - if (theta < 0.0 || theta > 1.0 || Double.isNaN(theta)) - throw new SketchesArgumentException("Theta must be in the range [0.0, 1.0]. Found: " + theta); - - c_ = theta; - if (theta == 1.0) { - if (data_ != null && data_.size() == 1) { - data_.set(0, item); - } else { - data_ = new ArrayList<T>(1); - data_.add(item); - } - partialItem_ = null; - } else { - data_ = null; - partialItem_ = item; - } - } - - void reset() { - c_ = 0.0; - partialItem_ = null; - data_.clear(); - } - - ArrayList<T> getSample() { - final double cFrac = c_ % 1; - final boolean includePartial = partialItem_ != null && rand_.nextDouble() < cFrac; - final int resultSize = (data_ != null ? data_.size() : 0) + (includePartial ? 1 : 0); - - if (resultSize == 0) - return null; - - ArrayList<T> result = new ArrayList<>(resultSize); - if (data_ != null) - result.addAll(data_); - - if (includePartial) - result.add(partialItem_); - - return result; - } - - @SuppressWarnings("unchecked") - T[] getAllSamples(final Class<?> clazz) { - // Is it faster to use sublist and append 1? - final T[] itemsArray = (T[]) Array.newInstance(clazz, getNumRetainedItems()); - int i = 0; - if (data_ != null) { - for (T item : data_) { - if (item != null) { - itemsArray[i++] = item; - } - } - } - if (partialItem_ != null) - itemsArray[i] = partialItem_; // no need to increment i again - - return itemsArray; - } - - // package-private for use in merge and serialization - ArrayList<T> getFullItems() { - return data_; - } - - // package-private for use in merge and serialization - T getPartialItem() { - return partialItem_; - } - - double getC() { return c_; } - - boolean hasPartialItem() { return partialItem_ != null; } - - // for testing to allow setting the seed - void replaceRandom(Random r) { - rand_ = r; - } - - void downsample(final double theta) { - if (theta >= 1.0) return; - - double newC = theta * c_; - double newCInt = Math.floor(newC); - double newCFrac = newC % 1; - double cInt = Math.floor(c_); - double cFrac = c_ % 1; - - if (newCInt == 0.0) { - // no full items retained - if (rand_.nextDouble() > (cFrac / c_)) { - swapWithPartialItem(); - } - data_.clear(); - } else if (newCInt == cInt) { - // no items deleted - if (rand_.nextDouble() > (1 - theta * cFrac)/(1 - newCFrac)) { - swapWithPartialItem(); - } - } else { - if (rand_.nextDouble() < theta * cFrac) { - // subsample data in random order; last item is partial - // create sample size newC then swapWithPartialItem() - subsample((int) newCInt); - swapWithPartialItem(); - } else { - // create sample size newCInt + 1 then moveOneToPartialItem() - subsample((int) newCInt + 1); - moveOneToPartialItem(); - } - } - - if (newC == newCInt) - partialItem_ = null; - - c_ = newC; - } - - void merge(final EbppsItemsSample<T> other) { - //double cInt = Math.floor(c_); - double cFrac = c_ % 1; - double otherCFrac = other.c_ % 1; - - // update c_ here but do NOT recompute fractional part yet - c_ += other.c_; - - if (other.data_ != null) - data_.addAll(other.data_); - - // This modifies the original algorithm slightly due to numeric - // precision issues. Specifically, the test if cFrac + otherCFrac == 1.0 - // happens before tests for < 1.0 or > 1.0 and can also be triggered - // if c_ == floor(c_) (the updated value of c_, not the input). - // - // We can still run into issues where cFrac + otherCFrac == epsilon - // and the first case would have ideally triggered. As a result, we must - // check if the partial item exists before adding to the data_ vector. - - if (cFrac == 0.0 && otherCFrac == 0.0) { - partialItem_ = null; - } else if (cFrac + otherCFrac == 1.0 || c_ == Math.floor(c_)) { - if (rand_.nextDouble() <= cFrac) { - if (partialItem_ != null) { - data_.add(partialItem_); - } - } else { - if (other.partialItem_ != null) { - data_.add(other.partialItem_); - } - } - partialItem_ = null; - } else if (cFrac + otherCFrac < 1.0) { - if (rand_.nextDouble() > cFrac / (cFrac + otherCFrac)) { - partialItem_ = other.partialItem_; - } - } else { // cFrac + otherCFrac > 1 - if (rand_.nextDouble() <= (1 - cFrac) / ((1 - cFrac) + (1 - otherCFrac))) { - data_.add(other.partialItem_); - } else { - data_.add(partialItem_); - partialItem_ = other.partialItem_; - } - } - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - - sb.append(" sample:").append(LS); - int idx = 0; - for (T item : data_) - sb.append("\t").append(idx++).append(":\t").append(item.toString()).append(LS); - sb.append(" partial: "); - if (partialItem_ != null) - sb.append(partialItem_.toString()).append(LS); - else - sb.append("NULL").append(LS); - - return sb.toString(); - } - - void subsample(final int numSamples) { - // we can perform a Fisher-Yates style shuffle, stopping after - // numSamples points since subsequent swaps would only be - // between items after num_samples. This is valid since a - // point from anywhere in the initial array would be eligible - // to end up in the final subsample. - - if (numSamples == data_.size()) return; - - int dataLen = data_.size(); - for (int i = 0; i < numSamples; ++i) { - int j = i + rand_.nextInt(dataLen - i); - // swap i and j - T tmp = data_.get(i); - data_.set(i, data_.get(j)); - data_.set(j, tmp); - } - - // clear anything beyond numSamples - data_.subList(numSamples, data_.size()).clear(); - } - - void swapWithPartialItem() { - if (partialItem_ == null) { - moveOneToPartialItem(); - } else { - int idx = rand_.nextInt(data_.size()); - T tmp = partialItem_; - partialItem_ = data_.get(idx); - data_.set(idx, tmp); - } - } - - void moveOneToPartialItem() { - int idx = rand_.nextInt(data_.size()); - // swap selected item to end so we can delete it easily - int lastIdx = data_.size() - 1; - if (idx != lastIdx) { - T tmp = data_.get(idx); - data_.set(idx, data_.get(lastIdx)); - partialItem_ = tmp; - } else { - partialItem_ = data_.get(lastIdx); - } - - data_.remove(lastIdx); - } - - int getNumRetainedItems() { - return (data_ != null ? data_.size() : 0) - + (partialItem_ != null ? 1 : 0); - } -} diff --git a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java deleted file mode 100644 index 8b44aa08..00000000 --- a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.datasketches.sampling; - -import static org.apache.datasketches.sampling.PreambleUtil.EBPPS_SER_VER; -import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.datasketches.common.ArrayOfItemsSerDe; -import org.apache.datasketches.common.Family; -import org.apache.datasketches.common.SketchesArgumentException; -import org.apache.datasketches.memory.Memory; -import org.apache.datasketches.memory.WritableMemory; - -/** - * An implementation of an Exact and Bounded Sampling Proportional to Size sketch. - * - * From: "Exact PPS Sampling with Bounded Sample Size", - * B. Hentschel, P. J. Haas, Y. Tian. Information Processing Letters, 2023. - * - * This sketch samples data from a stream of items proportional to the weight of each item. - * The sample guarantees the presence of an item in the result is proportional to that item's - * portion of the total weight seen by the sketch, and returns a sample no larger than size k. - * - * The sample may be smaller than k and the resulting size of the sample potentially includes - * a probabilistic component, meaning the resulting sample size is not always constant. - * - * @author Jon Malkin - */ -public class EbppsItemsSketch<T> { - private static final int MAX_K = Integer.MAX_VALUE - 2; - private static final int EBPPS_C_DOUBLE = 40; // part of sample state, not preamble - private static final int EBPPS_ITEMS_START = 48; - - private int k_; // max size of sketch, in items - private long n_; // total number of items processed by the sketch - - private double cumulativeWt_; // total weight of items processed by the sketch - private double wtMax_; // maximum weight seen so far - private double rho_; // latest scaling parameter for downsampling - - private EbppsItemsSample<T> sample_; // Object holding the current state of the sample - - private EbppsItemsSample<T> tmp_; // temporary storage - - /** - * Constructor - * @param k The maximum number of samples to retain - */ - public EbppsItemsSketch(final int k) { - checkK(k); - k_ = k; - rho_ = 1.0; - sample_ = new EbppsItemsSample<>(k); - tmp_ = new EbppsItemsSample<>(1); - } - - // private copy constrcutor - private EbppsItemsSketch(EbppsItemsSketch<T> other) { - k_ = other.k_; - n_ = other.n_; - rho_ = other.rho_; - cumulativeWt_ = other.cumulativeWt_; - wtMax_ = other.wtMax_; - rho_ = other.rho_; - sample_ = new EbppsItemsSample<>(other.sample_); - tmp_ = new EbppsItemsSample<>(1); - } - - // private constructor for heapify - private EbppsItemsSketch(final EbppsItemsSample<T> sample, - final int k, - final long n, - final double cumWt, - final double maxWt, - final double rho) { - k_ = k; - n_ = n; - cumulativeWt_ = cumWt; - wtMax_ = maxWt; - rho_ = rho; - sample_ = sample; - tmp_ = new EbppsItemsSample<>(1); - } - - /** - * Returns a sketch instance of this class from the given srcMem, - * which must be a Memory representation of this sketch class. - * - * @param <T> The type of item this sketch contains - * @param srcMem a Memory representation of a sketch of this class. - * <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a> - * @param serDe An instance of ArrayOfItemsSerDe - * @return a sketch instance of this class - */ - public static <T> EbppsItemsSketch<T> heapify(final Memory srcMem, - final ArrayOfItemsSerDe<T> serDe) - { - final int numPreLongs = PreambleUtil.getAndCheckPreLongs(srcMem); - final int serVer = PreambleUtil.extractSerVer(srcMem); - final int familyId = PreambleUtil.extractFamilyID(srcMem); - final int flags = PreambleUtil.extractFlags(srcMem); - final boolean isEmpty = (flags & EMPTY_FLAG_MASK) != 0; - - // Check values - if (isEmpty) { - if (numPreLongs != Family.EBPPS.getMinPreLongs()) { - throw new SketchesArgumentException("Possible corruption: Must be " + Family.EBPPS.getMinPreLongs() - + " for an empty sketch. Found: " + numPreLongs); - } - } else { - if (numPreLongs != Family.EBPPS.getMaxPreLongs()) { - throw new SketchesArgumentException("Possible corruption: Must be " - + Family.EBPPS.getMaxPreLongs() + " for a non-empty sketch. Found: " + numPreLongs); - } - } - if (serVer != EBPPS_SER_VER) { - throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + EBPPS_SER_VER + ": " + serVer); - } - final int reqFamilyId = Family.EBPPS.getID(); - if (familyId != reqFamilyId) { - throw new SketchesArgumentException( - "Possible Corruption: FamilyID must be " + reqFamilyId + ": " + familyId); - } - - final int k = PreambleUtil.extractK(srcMem); - if (k < 1 || k > MAX_K) { - throw new SketchesArgumentException("Possible Corruption: k must be at least 1 " - + "and less than " + MAX_K + ". Found: " + k); - } - - if (isEmpty) - return new EbppsItemsSketch<T>(k); - - final long n = PreambleUtil.extractN(srcMem); - if (n < 0) { - throw new SketchesArgumentException("Possible Corruption: n cannot be negative: " + n); - } - - final double cumWt = PreambleUtil.extractEbppsCumulativeWeight(srcMem); - if (cumWt < 0.0 || Double.isNaN(cumWt) || Double.isInfinite(cumWt)) { - throw new SketchesArgumentException("Possible Corruption: cumWt must be nonnegative and finite: " + cumWt); - } - - final double maxWt = PreambleUtil.extractEbppsMaxWeight(srcMem); - if (maxWt < 0.0 || Double.isNaN(maxWt) || Double.isInfinite(maxWt)) { - throw new SketchesArgumentException("Possible Corruption: maxWt must be nonnegative and finite: " + maxWt); - } - - final double rho = PreambleUtil.extractEbppsRho(srcMem); - if (rho < 0.0 || rho > 1.0 || Double.isNaN(rho) || Double.isInfinite(rho)) { - throw new SketchesArgumentException("Possible Corruption: rho must be in [0.0, 1.0]: " + rho); - } - - // extract C (part of sample_, not the preamble) - // due to numeric precision issues, c may occasionally be very slightly larger than k - final double c = srcMem.getDouble(EBPPS_C_DOUBLE); - if (c < 0 || c >= (k + 1) || Double.isNaN(c) || Double.isInfinite(c)) { - throw new SketchesArgumentException("Possible Corruption: c must be between 0 and k: " + c); - } - - // extract items - int numTotalItems = (int) Math.ceil(c); - int numFullItems = (int) Math.floor(c); // floor() not strictly necessary - final int offsetBytes = EBPPS_ITEMS_START; - final T[] rawItems = serDe.deserializeFromMemory( - srcMem.region(offsetBytes, srcMem.getCapacity() - offsetBytes), 0, numTotalItems); - final List<T> itemsList = Arrays.asList(rawItems); - final ArrayList<T> data; - final T partialItem; - if (numFullItems < numTotalItems) { - data = new ArrayList<>(itemsList.subList(0, numFullItems)); - partialItem = itemsList.get(numFullItems); // 0-based, so last item - } else { - data = new ArrayList<>(itemsList); - partialItem = null; // just to be explicit - } - - EbppsItemsSample<T> sample = new EbppsItemsSample<T>(data, partialItem, c); - - return new EbppsItemsSketch<>(sample, k, n, cumWt, maxWt, rho); - } - - /** - * Updates this sketch with the given data item with weight 1.0. - * @param item an item from a stream of items - */ - public void update(final T item) { - update(item, 1.0); - } - - /** - * Updates this sketch with the given data item with the given weight. - * @param item an item from a stream of items - * @param weight the weight of the item - */ - public void update(final T item, final double weight) { - if (weight < 0.0 || Double.isNaN(weight) || Double.isInfinite(weight)) - throw new SketchesArgumentException("Item weights must be nonnegative and finite. " - + "Found: " + weight); - if (weight == 0.0) - return; - - final double newCumWt = cumulativeWt_ + weight; - final double newWtMax = Math.max(wtMax_, weight); - final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); - - if (cumulativeWt_ > 0.0) - sample_.downsample((newRho / rho_)); - - tmp_.replaceContent(item, newRho * weight); - sample_.merge(tmp_); - - cumulativeWt_ = newCumWt; - wtMax_ = newWtMax; - rho_ = newRho; - ++n_; - } - - /* Merging - * There is a trivial merge algorithm that involves downsampling each sketch A and B - * as A.cum_wt / (A.cum_wt + B.cum_wt) and B.cum_wt / (A.cum_wt + B.cum_wt), - * respectively. That merge does preserve first-order probabilities, specifically - * the probability proportional to size property, and like all other known merge - * algorithms distorts second-order probabilities (co-occurrences). There are - * pathological cases, most obvious with k=2 and A.cum_wt == B.cum_wt where that - * approach will always take exactly 1 item from A and 1 from B, meaning the - * co-occurrence rate for two items from either sketch is guaranteed to be 0.0. - * - * With EBPPS, once an item is accepted into the sketch we no longer need to - * track the item's weight: All accepted items are treated equally. As a result, we - * can take inspiration from the reservoir sampling merge in the datasketches-java - * library. We need to merge the smaller sketch into the larger one, swapping as - * needed to ensure that, at which point we simply call update() with the items - * in the smaller sketch as long as we adjust the weight appropriately. - * Merging smaller into larger is essential to ensure that no item has a - * contribution to C > 1.0. - */ - - /** - * Merges the provided sketch into the current one. - * @param other the sketch to merge into the current object - */ - public void merge(final EbppsItemsSketch<T> other) { - if (other.getCumulativeWeight() == 0.0) return; - else if (other.getCumulativeWeight() > cumulativeWt_) { - // need to swap this with other - // make a copy of other, merge into it, and take the result - EbppsItemsSketch<T> copy = new EbppsItemsSketch<>(other); - copy.internalMerge(this); - k_ = copy.k_; - n_ = copy.n_; - cumulativeWt_ = copy.cumulativeWt_; - wtMax_ = copy.wtMax_; - rho_ = copy.rho_; - sample_ = copy.sample_; - } else { - internalMerge(other); - } - } - - // merge implementation called exclusively from public merge() - private void internalMerge(EbppsItemsSketch<T> other) { - // assumes that other.cumulativeWeight_ <= cumulativeWt_m - // which must be checked before calling this - - final double finalCumWt = cumulativeWt_ + other.cumulativeWt_; - final double newWtMax = Math.max(wtMax_, other.wtMax_); - k_ = Math.min(k_, other.k_); - final long newN = n_ + other.n_; - - // Insert other's items with the cumulative weight - // split between the input items. We repeat the same process - // for full items and the partial item, scaling the input - // weight appropriately. - // We handle all C input items, meaning we always process - // the partial item using a scaled down weight. - // Handling the partial item by probabilistically including - // it as a full item would be correct on average but would - // introduce bias for any specific merge operation. - final double avgWt = other.cumulativeWt_ / other.getC(); - ArrayList<T> items = other.sample_.getFullItems(); - if (items != null) { - for (int i = 0; i < items.size(); ++i) { - // newWtMax is pre-computed - final double newCumWt = cumulativeWt_ + avgWt; - final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); - - if (cumulativeWt_ > 0.0) - sample_.downsample(newRho / rho_); - - tmp_.replaceContent(items.get(i), newRho * avgWt); - sample_.merge(tmp_); - - cumulativeWt_ = newCumWt; - rho_ = newRho; - } - } - - // insert partial item with weight scaled by the fractional part of C - if (other.sample_.hasPartialItem()) { - final double otherCFrac = other.getC() % 1; - final double newCumWt = cumulativeWt_ + (otherCFrac * avgWt); - final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); - - if (cumulativeWt_ > 0.0) - sample_.downsample(newRho / rho_); - - tmp_.replaceContent(other.sample_.getPartialItem(), newRho * otherCFrac * avgWt); - sample_.merge(tmp_); - - cumulativeWt_ = newCumWt; - rho_ = newRho; - } - - // avoid numeric issues by setting cumulative weight to the - // pre-computed value - cumulativeWt_ = finalCumWt; - n_ = newN; - } - - /** - * Returns a copy of the current sample. The exact size may be - * probabilsitic, differing by at most 1 item. - * @return the current sketch sample - */ - public ArrayList<T> getResult() { return sample_.getSample(); } - - /** - * Provides a human-readable summary of the sketch - * @return a summary of information in the sketch - */ - @Override - public String toString() { - return null; - } - - /** - * Returns the configured maximum sample size. - * @return configured maximum sample size - */ - public int getK() { return k_; } - - /** - * Returns the number of items processed by the sketch, regardless - * of item weight. - * @return count of items processed by the sketch - */ - public long getN() { return n_; } - - /** - * Returns the cumulative weight of items processed by the sketch. - * @return cumulative weight of items seen - */ - public double getCumulativeWeight() { return cumulativeWt_; } - - /** - * Returns the expected number of samples returned upon a call to - * getResult(). The number is a floating point value, where the - * fractional portion represents the probability of including a - * "partial item" from the sample. - * - * The value C should be no larger than the sketch's configured - * value of k, although numerical precision limitations mean it - * may exceed k by double precision floating point error margins - * in certain cases. - * @return The expected number of samples returned when querying the sketch - */ - public double getC() { return sample_.getC(); } - - /** - * Returns true if the sketch is empty. - * @return empty flag - */ - public boolean isEmpty() { return n_ == 0; } - - /** - * Resets the sketch to its default, empty state. - */ - public void reset() { - n_ = 0; - cumulativeWt_ = 0.0; - wtMax_ = 0.0; - rho_ = 1.0; - sample_ = new EbppsItemsSample<>(k_); - } - - /** - * Returns the size of a byte array representation of this sketch. May fail for polymorphic item types. - * - * @param serDe An instance of ArrayOfItemsSerDe - * @return the length of a byte array representation of this sketch - */ - public int getSerializedSizeBytes(final ArrayOfItemsSerDe<? super T> serDe) { - if (isEmpty()) - return Family.EBPPS.getMinPreLongs() << 3; - else if (sample_.getC() < 1.0) - return getSerializedSizeBytes(serDe, sample_.getPartialItem().getClass()); - else - return getSerializedSizeBytes(serDe, sample_.getSample().get(0).getClass()); - } - - /** - * Returns the length of a byte array representation of this sketch. Copies contents into an array of the - * specified class for serialization to allow for polymorphic types. - * - * @param serDe An instance of ArrayOfItemsSerDe - * @param clazz The class represented by <T> - * @return the length of a byte array representation of this sketch - */ - public int getSerializedSizeBytes(final ArrayOfItemsSerDe<? super T> serDe, final Class<?> clazz) { - if (n_ == 0) - return Family.EBPPS.getMinPreLongs() << 3; - - final int preLongs = Family.EBPPS.getMaxPreLongs(); - final byte[] itemBytes = serDe.serializeToByteArray(sample_.getAllSamples(clazz)); - // in C++, c_ is serialized as part of the sample_ and not included in the header size - return (preLongs << 3) + Double.BYTES + itemBytes.length; - } - - /** - * Returns a byte array representation of this sketch. May fail for polymorphic item types. - * - * @param serDe An instance of ArrayOfItemsSerDe - * @return a byte array representation of this sketch - */ - public byte[] toByteArray(final ArrayOfItemsSerDe<? super T> serDe) { - if (n_ == 0) - // null class is ok since empty -- no need to call serDe - return toByteArray(serDe, null); - else if (sample_.getC() < 1.0) - return toByteArray(serDe, sample_.getPartialItem().getClass()); - else - return toByteArray(serDe, sample_.getSample().get(0).getClass()); - } - - /** - * Returns a byte array representation of this sketch. Copies contents into an array of the - * specified class for serialization to allow for polymorphic types. - * - * @param serDe An instance of ArrayOfItemsSerDe - * @param clazz The class represented by <T> - * @return a byte array representation of this sketch - */ - public byte[] toByteArray(final ArrayOfItemsSerDe<? super T> serDe, final Class<?> clazz) { - final int preLongs, outBytes; - final boolean empty = n_ == 0; - byte[] itemBytes = null; // for serialized items from sample_ - - if (empty) { - preLongs = 1; - outBytes = 8; - } else { - preLongs = Family.EBPPS.getMaxPreLongs(); - itemBytes = serDe.serializeToByteArray(sample_.getAllSamples(clazz)); - // in C++, c_ is serialized as part of the sample_ and not included in the header size - outBytes = (preLongs << 3) + Double.BYTES + itemBytes.length; - } - final byte[] outArr = new byte[outBytes]; - final WritableMemory mem = WritableMemory.writableWrap(outArr); - - // Common header elements - PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, EBPPS_SER_VER); // Byte 1 - PreambleUtil.insertFamilyID(mem, Family.EBPPS.getID()); // Byte 2 - if (empty) { - PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 - } else { - PreambleUtil.insertFlags(mem, 0); - } - PreambleUtil.insertK(mem, k_); // Bytes 4-7 - - // conditional elements - if (!empty) { - PreambleUtil.insertN(mem, n_); - PreambleUtil.insertEbppsCumulativeWeight(mem, cumulativeWt_); - PreambleUtil.insertEbppsMaxWeight(mem, wtMax_); - PreambleUtil.insertEbppsRho(mem, rho_); - - // data from sample_ -- itemBytes includes the partial item - mem.putDouble(EBPPS_C_DOUBLE, sample_.getC()); - mem.putByteArray(EBPPS_ITEMS_START, itemBytes, 0, itemBytes.length); - } - - return outArr; - } - - private void checkK(final int k) { - if (k <= 0 || k > MAX_K) - throw new SketchesArgumentException("k must be strictly positive and less than " + MAX_K); - } -} diff --git a/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java b/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java index 454514fe..453e3901 100644 --- a/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java +++ b/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java @@ -35,10 +35,12 @@ import org.apache.datasketches.memory.WritableMemory; /** * This class defines the preamble items structure and provides basic utilities for some of the key - * fields. Fields are presented in Little Endian format, but multi-byte values (int, long, double) - * are stored in native byte order. All <tt>byte</tt> values are treated as unsigned. + * fields. * - * Reservoir Sampling + * <p> + * MAP: Low significance bytes of this <i>long</i> items structure are on the right. However, the + * multi-byte integers (<i>int</i> and <i>long</i>) are stored in native byte order. The + * <i>byte</i> values are treated as unsigned.</p> * * <p><strong>Sketch:</strong> The count of items seen is limited to 48 bits (~256 trillion) even * though there are adjacent unused preamble bits. The acceptance probability for an item is a @@ -52,11 +54,11 @@ import org.apache.datasketches.memory.WritableMemory; * <pre> * Long || Start Byte Adr: * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| + * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * 0 ||--------Reservoir Size (K)---------| Flags | FamID | SerVer | Preamble_Longs | * - * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | - * 1 ||----------------------------Items Seen Count (N)-------------------------------| + * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 | + * 1 ||------------------------------Items Seen Count (N)---------------------------------| * </pre> * * <p><strong>Union:</strong> The reservoir union has fewer internal parameters to track and uses @@ -72,13 +74,10 @@ import org.apache.datasketches.memory.WritableMemory; * <pre> * Long || Start Byte Adr: * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| + * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * 0 ||---------Max Res. Size (K)---------| Flags | FamID | SerVer | Preamble_Longs | * </pre> * - * - * VarOpt Sampling - * * <p><strong>VarOpt:</strong> A VarOpt sketch has a more complex internal items structure and * requires a larger preamble. Values serving a similar purpose in both reservoir and varopt sampling * share the same byte ranges, allowing method re-use where practical.</p> @@ -89,21 +88,21 @@ import org.apache.datasketches.memory.WritableMemory; * <pre> * Long || Start Byte Adr: * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| + * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * 0 ||--------Reservoir Size (K)---------| Flags | FamID | SerVer | Preamble_Longs | * - * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | - * 1 ||----------------------------Items Seen Count (N)-------------------------------| + * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 | + * 1 ||------------------------------Items Seen Count (N)---------------------------------| * - * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | - * 2 ||--------Item Count in R-----------|-----------Item Count in H------------------| + * || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 | + * 2 ||---------Item Count in R-----------|-----------Item Count in H---------------------| * - * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | - * 3 ||------------------------------Total Weight in R--------------------------------| + * || 31 | 30 | 29 | 28 | 27 | 26 | 25 | 24 | + * 3 ||--------------------------------Total Weight in R----------------------------------| * </pre> * * <p><strong>VarOpt Union:</strong> VarOpt unions also store more information than a reservoir - * sketch. As before, we keep values with similar to the same meaning in corresponding locations + * sketch. As before, we keep values with similar o hte same meaning in corresponding locations * actoss sketch and union formats. The items in the union are stored in a varopt sketch-compatible * format after the union preamble.</p> * @@ -112,63 +111,19 @@ import org.apache.datasketches.memory.WritableMemory; * <pre> * Long || Start Byte Adr: * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| + * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * 0 ||---------Max Res. Size (K)---------| Flags | FamID | SerVer | Preamble_Longs | * - * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | - * 1 ||----------------------------Items Seen Count (N)-------------------------------| + * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 | + * 1 ||------------------------------Items Seen Count (N)---------------------------------| * - * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | - * 2 ||-------------------------Outer Tau Numerator (double)--------------------------| + * || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 | + * 2 ||---------------------------Outer Tau Numerator (double)----------------------------| * - * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | - * 3 ||-------------------------Outer Tau Denominator (long)--------------------------| + * || 31 | 30 | 29 | 28 | 27 | 26 | 25 | 24 | + * 3 ||---------------------------Outer Tau Denominator (long)----------------------------| * </pre> * - * - * EPPS Sampling - * - * An empty sketch requires 8 bytes. - * - * <pre> - * Long || Start Byte Adr: - * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| - * </pre> - * - * A non-empty sketch requires 40 bytes of preamble. C looks like part of - * the preamble but is treated as part of the sample state. - * - * The count of items seen is not used but preserved as the value seems like a useful - * count to track. - * - * <pre> - * Long || Start Byte Adr: - * Adr: - * || 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | - * 0 || Preamble_Longs | SerVer | FamID | Flags |---------Max Res. Size (K)---------| - * - * || 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | - * 1 ||---------------------------Items Seen Count (N)--------------------------------| - * - * || 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | - * 2 ||----------------------------Cumulative Weight----------------------------------| - * - * || 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 | - * 3 ||-----------------------------Max Item Weight-----------------------------------| - * - * || 32 | 33 | 34 | 35 | 36 | 37 | 38 | 39 | - * 4 ||----------------------------------Rho------------------------------------------| - * - * || 40 | 41 | 42 | 43 | 44 | 45 | 46 | 47 | - * 5 ||-----------------------------------C-------------------------------------------| - * - * || 40+ | - * 6+ || {Items Array} | - * || {Optional Item (if needed)} | - * </pre> - * * @author Jon Malkin * @author Lee Rhodes */ @@ -200,11 +155,6 @@ final class PreambleUtil { static final int VO_PRELONGS_WARMUP = 3; // Doesn't match min or max prelongs in Family static final int VO_PRELONGS_FULL = Family.VAROPT.getMaxPreLongs(); - // constants and addresses used in EBPPS - static final int EBPPS_CUM_WT_DOUBLE = 16; - static final int EBPPS_MAX_WT_DOUBLE = 24; - static final int EBPPS_RHO_DOUBLE = 32; - // flag bit masks //static final int BIG_ENDIAN_FLAG_MASK = 1; //static final int READ_ONLY_FLAG_MASK = 2; @@ -212,9 +162,7 @@ final class PreambleUtil { static final int GADGET_FLAG_MASK = 128; //Other constants - static final int RESERVOIR_SER_VER = 2; - static final int VAROPT_SER_VER = 2; - static final int EBPPS_SER_VER = 1; + static final int SER_VER = 2; static final boolean NATIVE_ORDER_IS_BIG_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN); @@ -430,18 +378,6 @@ final class PreambleUtil { return mem.getLong(OUTER_TAU_DENOM_LONG); } - static double extractEbppsCumulativeWeight(final Memory mem) { - return mem.getDouble(EBPPS_CUM_WT_DOUBLE); - } - - static double extractEbppsMaxWeight(final Memory mem) { - return mem.getDouble(EBPPS_MAX_WT_DOUBLE); - } - - static double extractEbppsRho(final Memory mem) { - return mem.getDouble(EBPPS_RHO_DOUBLE); - } - // Insertion methods static void insertPreLongs(final WritableMemory wmem, final int preLongs) { @@ -503,18 +439,6 @@ final class PreambleUtil { wmem.putLong(OUTER_TAU_DENOM_LONG, denom); } - static void insertEbppsCumulativeWeight(final WritableMemory wmem, final double cumWt) { - wmem.putDouble(EBPPS_CUM_WT_DOUBLE, cumWt); - } - - static void insertEbppsMaxWeight(final WritableMemory wmem, final double maxWt) { - wmem.putDouble(EBPPS_MAX_WT_DOUBLE, maxWt); - } - - static void insertEbppsRho(final WritableMemory wmem, final double rho) { - wmem.putDouble(EBPPS_RHO_DOUBLE, rho); - } - /** * Checks Memory for capacity to hold the preamble and returns the extracted preLongs. * @param mem the given Memory diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java index 7a3abc4b..7b4855d8 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractK; @@ -229,13 +229,13 @@ public final class ReservoirItemsSketch<T> { + Family.RESERVOIR.getMinPreLongs() + " preLong(s)"); } - if (serVer != RESERVOIR_SER_VER) { + if (serVer != SER_VER) { if (serVer == 1) { final short encK = extractEncodedReservoirSize(srcMem); k = ReservoirSize.decodeValue(encK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } } @@ -479,7 +479,7 @@ public final class ReservoirItemsSketch<T> { // Common header elements PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java index 74a52e49..9f4bd5f3 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; @@ -109,13 +109,13 @@ public final class ReservoirItemsUnion<T> { + Family.RESERVOIR_UNION.getMinPreLongs() + "preLongs"); } - if (serVer != RESERVOIR_SER_VER) { + if (serVer != SER_VER) { if (serVer == 1) { final short encMaxK = extractEncodedReservoirSize(srcMem); maxK = ReservoirSize.decodeValue(encMaxK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } } @@ -304,7 +304,7 @@ public final class ReservoirItemsUnion<T> { // build preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java index c1263935..55d7463e 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractK; @@ -209,13 +209,13 @@ public final class ReservoirLongsSketch { + Family.RESERVOIR.getMinPreLongs() + "preLongs"); } - if (serVer != RESERVOIR_SER_VER) { + if (serVer != SER_VER) { if (serVer == 1) { final short encK = extractEncodedReservoirSize(srcMem); k = ReservoirSize.decodeValue(encK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } } @@ -407,7 +407,7 @@ public final class ReservoirLongsSketch { // build first preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java index c3ef3395..f8d9d1eb 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; @@ -100,13 +100,13 @@ public final class ReservoirLongsUnion { + Family.RESERVOIR_UNION.getMinPreLongs() + "preLongs"); } - if (serVer != RESERVOIR_SER_VER) { + if (serVer != SER_VER) { if (serVer == 1) { final short encMaxK = extractEncodedReservoirSize(srcMem); maxK = ReservoirSize.decodeValue(encMaxK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } } @@ -257,7 +257,7 @@ public final class ReservoirLongsUnion { // construct header PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java index be65bf58..b2b106bc 100644 --- a/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.GADGET_FLAG_MASK; -import static org.apache.datasketches.sampling.PreambleUtil.VAROPT_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.TOTAL_WEIGHT_R_DOUBLE; import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_EMPTY; import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_FULL; @@ -291,9 +291,9 @@ public final class VarOptItemsSketch<T> { + " or " + VO_PRELONGS_FULL + " for a non-empty sketch. Found: " + numPreLongs); } } - if (serVer != VAROPT_SER_VER) { + if (serVer != SER_VER) { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } final int reqFamilyId = Family.VAROPT.getID(); if (familyId != reqFamilyId) { @@ -587,7 +587,7 @@ public final class VarOptItemsSketch<T> { // build first preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, VAROPT_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.VAROPT.getID()); // Byte 2 PreambleUtil.insertFlags(mem, flags); // Byte 3 PreambleUtil.insertK(mem, k_); // Bytes 4-7 diff --git a/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java b/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java index 1f1215d7..d21c716a 100644 --- a/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java @@ -22,7 +22,7 @@ package org.apache.datasketches.sampling; import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.VAROPT_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; import static org.apache.datasketches.sampling.PreambleUtil.extractN; @@ -165,9 +165,9 @@ public final class VarOptItemsUnion<T> { outerTauDenom = extractOuterTauDenominator(srcMem); } - if (serVer != VAROPT_SER_VER) { + if (serVer != SER_VER) { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); } final boolean preLongsEqMin = (numPreLongs == Family.VAROPT_UNION.getMinPreLongs()); @@ -332,7 +332,7 @@ public final class VarOptItemsUnion<T> { // build preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, VAROPT_SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.VAROPT_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); diff --git a/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java b/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java deleted file mode 100644 index e00c216a..00000000 --- a/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.datasketches.sampling; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Random; - -import org.testng.annotations.Test; - -public class EbppsSampleTest { - private static final double EPS = 1e-14; - - @Test - public void basicInitialization() { - EbppsItemsSample<Integer> sample = new EbppsItemsSample<>(0); - assertEquals(sample.getC(), 0.0); - assertEquals(sample.getNumRetainedItems(), 0); - assertNull(sample.getSample()); - } - - @Test - public void initializeWithData() { - final double theta1 = 1.0; - EbppsItemsSample<Integer> sample = new EbppsItemsSample<>(1); - sample.replaceContent(-1, theta1); - assertEquals(sample.getC(), theta1); - assertEquals(sample.getNumRetainedItems(), 1); - assertEquals(sample.getSample().size(), 1); - assertEquals(sample.getSample().get(0), -1); - assertFalse(sample.hasPartialItem()); - - final double theta2 = 1e-300; - sample.replaceContent(-2, theta2); - assertEquals(sample.getC(), theta2); - assertEquals(sample.getNumRetainedItems(), 1); - // next check assumes random number is > 1e-300 - assertNull(sample.getSample()); - assertTrue(sample.hasPartialItem()); - } - - @Test - public void downsampleToZeroOrOneItem() { - EbppsItemsSample<String> sample = new EbppsItemsSample<>(1); - sample.replaceContent("a", 1.0); - - sample.downsample(2.0); // no-op - assertEquals(sample.getC(), 1.0); - assertEquals(sample.getNumRetainedItems(), 1); - assertEquals(sample.getSample().get(0), "a"); - assertFalse(sample.hasPartialItem()); - - // downsample and result in an empty sample - ArrayList<String> items = new ArrayList<>(Arrays.asList("a", "b")); - sample = new EbppsItemsSample<>(items, null, 1.8); - sample.replaceRandom(new Random(85942)); - sample.downsample(0.5); - assertEquals(sample.getC(), 0.9); - assertEquals(sample.getNumRetainedItems(), 0); - assertNull(sample.getSample()); - assertFalse(sample.hasPartialItem()); - - // downsample and result in a sample with a partial item - // create a new ArrayList each time to be sure it's clean - items = new ArrayList<>(Arrays.asList("a", "b")); - sample = new EbppsItemsSample<>(items, null, 1.5); - sample.replaceRandom(new Random(15)); - sample.downsample(0.5); - assertEquals(sample.getC(), 0.75); - assertEquals(sample.getNumRetainedItems(), 1); - assertTrue(sample.hasPartialItem()); - for (String s : sample.getSample()) { - assertTrue("a".equals(s) || "b".equals(s)); - } - } - - @Test - public void downsampleMultipleItems() { - // downsample to an exact integer c (7.5 * 0.8 = 6.0) - ArrayList<String> items = new ArrayList<>(Arrays.asList("a", "b", "c", "d", "e", "f", "g")); - String partial = "h"; - ArrayList<String> referenceItems = new ArrayList<>(items); // copy of inputs - referenceItems.add("h"); // include the partial item - - EbppsItemsSample<String> sample = new EbppsItemsSample<>(items, partial, 7.5); - sample.downsample(0.8); - assertEquals(sample.getC(), 6.0); - assertEquals(sample.getNumRetainedItems(), 6); - assertFalse(sample.hasPartialItem()); - for (String s : sample.getSample()) { - assertTrue(referenceItems.contains(s)); - } - - // downsample to c > 1 with partial item - items = new ArrayList<>(referenceItems); // includes previous optional - partial = "i"; - referenceItems.add("i"); - sample = new EbppsItemsSample<>(items, partial, 8.5); - sample.downsample(0.8); - assertEquals(sample.getC(), 6.8, EPS); - assertEquals(sample.getNumRetainedItems(), 7); - assertTrue(sample.hasPartialItem()); - for (String s : sample.getSample()) { - assertTrue(referenceItems.contains(s)); - } - } - - @Test - public void mergeUnitSamples() { - int k = 8; - EbppsItemsSample<Integer> sample = new EbppsItemsSample<>(k); - EbppsItemsSample<Integer> s = new EbppsItemsSample<>(1); - - for (int i = 1; i <= k; ++i) { - s.replaceContent(i, 1.0); - sample.merge(s); - assertEquals(sample.getC(), (double) i); - assertEquals(sample.getNumRetainedItems(), i); - } - - sample.reset(); - assertEquals(sample.getC(), 0.0); - assertEquals(sample.getNumRetainedItems(), 0); - assertFalse(sample.hasPartialItem()); - } -} diff --git a/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java b/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java deleted file mode 100644 index dde0db66..00000000 --- a/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.datasketches.sampling; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -import java.util.ArrayList; - -import org.apache.datasketches.common.ArrayOfLongsSerDe; -import org.apache.datasketches.common.ArrayOfStringsSerDe; -import org.apache.datasketches.common.SketchesArgumentException; -import org.apache.datasketches.memory.Memory; -import org.apache.datasketches.memory.WritableMemory; -import org.testng.annotations.Test; - -public class EbppsSketchTest { - private static final double EPS = 1e-13; - - static EbppsItemsSketch<Integer> createUnweightedSketch(int k, long n) { - EbppsItemsSketch<Integer> sk = new EbppsItemsSketch<>(k); - for (long i = 0; i < n; ++i) { - sk.update((int) i); - } - return sk; - } - - static <T> void checkIfEqual(EbppsItemsSketch<T> sk1, EbppsItemsSketch<T> sk2) { - assertEquals(sk1.getK(), sk2.getK()); - assertEquals(sk1.getN(), sk2.getN()); - assertEquals(sk1.getC(), sk2.getC()); - assertEquals(sk1.getCumulativeWeight(), sk2.getCumulativeWeight()); - - // results may validly differ in size based on presence of partial items - ArrayList<T> sample1 = sk1.getResult(); - ArrayList<T> sample2 = sk2.getResult(); - - if (sk1.getC() < 1.0) { - if (sample1 != null && sample2 != null) { - assertEquals(sample1.size(), sample2.size()); - assertEquals(sample1.get(0), sample2.get(0)); - } - // nothing to test if one is null and the other isn't - } else { - // sk1.getC() >= 1.0 and sk2.getC() >= 1.0 (they're equal per above) - // so the samples shouldn't be null - assertTrue(sample1 != null && sample2 != null); - final int len = Math.min(sample1.size(), sample2.size()); - for (int i = 0; i < len; ++i) { - assertEquals(sample1.get(i), sample2.get(i)); - } - assertTrue((len == Math.floor(sk1.getC()) || len == Math.ceil(sk1.getC()))); - - // if c != floor(c) one sketch may not have reached the end, - // but that's not reliably testable from the external API - } - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void checkZeroK() { - new EbppsItemsSketch<String>(0); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void checkTooBigK() { - new EbppsItemsSketch<String>(Integer.MAX_VALUE); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void checkNegativeWeight() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(1); - sk.update("a", -1.0); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void checkInfiniteWeight() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(1); - sk.update("a", Double.POSITIVE_INFINITY); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void checkNaNWeight() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(1); - sk.update("a", Double.NaN); - } - - @Test - public void insertItems() { - int n = 0; - final int k = 5; - - // empty sketch - EbppsItemsSketch<Integer> sk = createUnweightedSketch(k, n); - assertEquals(sk.getK(), k); - assertEquals(sk.getN(), 0); - assertEquals(sk.getC(), 0.0); - assertEquals(sk.getCumulativeWeight(), 0.0); - assertTrue(sk.isEmpty()); - - // exact mode - n = k; - sk = createUnweightedSketch(k, n); - assertFalse(sk.isEmpty()); - assertEquals(sk.getN(), n); - assertEquals(sk.getC(), (double) k); - assertEquals(sk.getCumulativeWeight(), (double) n); - assertEquals(sk.getResult().size(), sk.getK()); - for (Integer val : sk.getResult()) - assertTrue(val < n); - - // sampling mode with uniform eights - n = k * 10; - sk = createUnweightedSketch(k, n); - assertFalse(sk.isEmpty()); - assertEquals(sk.getN(), n); - assertEquals(sk.getCumulativeWeight(), (double) n); - assertEquals(sk.getC(), (double) k, EPS); - assertEquals(sk.getResult().size(), sk.getK()); - for (Integer val : sk.getResult()) - assertTrue(val < n); - - // add a very heavy item - sk.update(n, (double) n); - assertTrue(sk.getC() < sk.getK()); - } - - @Test - public void mergeSmallIntoLarge() { - final int k = 100; - - final EbppsItemsSketch<Integer> sk1 = createUnweightedSketch(k, k); - final EbppsItemsSketch<Integer> sk2 = new EbppsItemsSketch<>(k / 2); - sk2.update(-1, k / 10.0); // on eheavy item, but less than sk1 weight - - sk1.merge(sk2); - assertEquals(sk1.getK(), k / 2); - assertEquals(sk1.getN(), k + 1); - assertTrue(sk1.getC() < k); - assertEquals(sk1.getCumulativeWeight(), 1.1 * k, EPS); - } - - @Test - public void mergeLargeIntoSmall() { - final int k = 100; - - EbppsItemsSketch<Integer> sk1 = new EbppsItemsSketch<>(k / 2); - sk1.update(-1, k / 4.0); - sk1.update(-2, k / 8.0); - EbppsItemsSketch<Integer> sk2 = createUnweightedSketch(k, k); - assertEquals(sk2.getN(), k); - assertEquals(sk2.getC(), k, EPS); - - sk1.merge(sk2); - assertEquals(sk1.getK(), k / 2); - assertEquals(sk1.getN(), k + 2); - assertTrue(sk1.getC() < k); - // cumulative weight is now (1 + 0.25 + 0.125)k = 1.375k - assertEquals(sk1.getCumulativeWeight(), 1.375 * k, EPS); - } - - @Test - public void serializeDeserializeString() { - // since C <= k we don't have the usual sketch notion of exact vs estimation - // mode at any time. The only real serializaiton cases are empty and non-empty - // with and without a partial item - final int k = 10; - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(k); - - // empty - byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); - Memory mem = Memory.wrap(bytes); - EbppsItemsSketch<String> sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - checkIfEqual(sk, sk_heapify); - - // add uniform items - for (int i = 0; i < k; ++i) - sk.update(Integer.toString(i)); - - // non-empty, no partial item - bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); - mem = Memory.wrap(bytes); - sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - checkIfEqual(sk, sk_heapify); - - // non-empty with partial item - sk.update(Integer.toString(2 * k), 2.5); - assertEquals(sk.getCumulativeWeight(), k + 2.5, EPS); - bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); - mem = Memory.wrap(bytes); - sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - checkIfEqual(sk, sk_heapify); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeZeroK() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertK(mem, 0); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeTooLargeK() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertK(mem, Integer.MAX_VALUE); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeBadSerVer() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertSerVer(mem, -1); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeBadFamily() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertFamilyID(mem, 0); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeNegativeN() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertN(mem, -1000); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeNaNCumulativeWeight() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertEbppsCumulativeWeight(mem, Double.NaN); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeInfiniteMaxWeight() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertEbppsMaxWeight(mem, Double.POSITIVE_INFINITY); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeNegativeRho() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - PreambleUtil.insertEbppsRho(mem, -0.1); - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeNegativeC() { - EbppsItemsSketch<String> sk = new EbppsItemsSketch<>(5); - for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); - final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - mem.putDouble(40, -2.0); // from the defined spec - EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); - } - - @Test(expectedExceptions = SketchesArgumentException.class) - public void deserializeTooShort() { - EbppsItemsSketch<Long> sk = new EbppsItemsSketch<>(5); - for (long i = 0; i < 10; ++i) sk.update(i); - final byte[] bytes = sk.toByteArray(new ArrayOfLongsSerDe()); - final WritableMemory mem = WritableMemory.writableWrap(bytes); - final Memory shortMem = mem.region(0, mem.getCapacity() - 1); - EbppsItemsSketch.heapify(shortMem, new ArrayOfStringsSerDe()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
