[ https://issues.apache.org/jira/browse/BEAM-2728?focusedWorklogId=117106&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117106 ]
ASF GitHub Bot logged work on BEAM-2728: ---------------------------------------- Author: ASF GitHub Bot Created on: 28/Jun/18 22:53 Start Date: 28/Jun/18 22:53 Worklog Time Spent: 10m Work Description: jkff closed pull request #4768: [BEAM-2728] Sketching most frequent URL: https://github.com/apache/beam/pull/4768 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sketching/pom.xml b/sdks/java/extensions/sketching/pom.xml index d5ca7afd7b0..838038dcb85 100755 --- a/sdks/java/extensions/sketching/pom.xml +++ b/sdks/java/extensions/sketching/pom.xml @@ -30,7 +30,7 @@ <name>Apache Beam :: SDKs :: Java :: Extensions :: Sketching</name> <properties> - <streamlib.version>2.9.5</streamlib.version> + <streamlib.version>2.9.6</streamlib.version> <t-digest.version>3.2</t-digest.version> </properties> diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java new file mode 100755 index 00000000000..62b62b613bc --- /dev/null +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.clearspring.analytics.stream.Counter; +import com.clearspring.analytics.stream.StreamSummary; +import com.google.auto.value.AutoValue; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@code PTransform}s for computing the most frequent elements of a stream. + * + * <p>This class uses the Stream Summary sketch which uses a HashMap to keep track of + * a certain number of elements considered as the most frequent elements in a stream. + * + * <h2>Brief overview of the sketch</h2> + * + * <p>The Stream Summary uses a doubly linked-list of buckets in order to order elements + * by frequency. Each bucket is associated with a list of Counters, each of them storing + * information about one of the tracked elements : value, count and maximum potential error. + * <br> + * An element is guaranteed to be in the top K most frequent if its guaranteed number + * of hits, i.e. the {@code count minus error}, is greater than the count of the element at + * the position k+1. + * <br> + * <b>WARNING</b>: Information about the error is lost during the parallelization, + * so this value may be underestimated in most cases, that is why the error is + * not output by default. This issue will be solved in the next version of the extension. + * + * <h2>References</h2> + * + * <p>This class uses the Space-Saving algorithm, introduced in the paper available <a href= + * "https://pdfs.semanticscholar.org/72f1/5aba2e67b1cc9cd1fb12c99e101c3c1aae3b.pdf">here</a>. + * <br>The implementation of the Stream Summary sketch comes from + * <a href="https://github.com/addthis/stream-lib">Addthis' library Stream-lib</a>: + * + * <h2>Parameters</h2> + * + * <p>Two parameters can be tuned: + * + * <ul> + * <li>The top {@code k} most frequent elements you want to output from the stream.<br> + * By default, {@code k} is set to {@code 100}. + * <li>The {@code capacity} of the Stream Summary sketch. It corresponds to + * the maximum number of elements that can be tracked at once. + * When the maximum {@code capacity} is reached, the least frequent element + * will be replaced by the next untracked element.<br> + * By default, the {@code capacity} is set to {@code 10000} but this value depends + * highly on the use case and more specifically on the order of incoming elements. + * </ul> + * + * <p><b>WARNING</b>: Keep in mind that he {@code capacity} should always be + * significantly higher than {@code k}. One must ask himself the following question:<br> + * What is the lowest rank {@code x} such that any element arriving at a rank {@code y > x} + * has a relatively small probability (you decide) to actually become one the {@code k} + * most frequent elements by the end of the stream ?<br> + * Then the {@code capacity} should at least be equal to {@code x} + * + * <h2>Examples:</h2> + * + * <p>There are 2 ways of using this class: + * + * <ul> + * <li>Use the {@link PTransform}s that return a {@link PCollection} of {@link KV}s, + * each of them associating one of the estimate top {@code k} most frequent elements + * to its frequency count. + * <li>Use one of the two {@link MostFrequentFn} combines that is exposed in order + * to make advanced processing using directly the {@link StreamSummary} sketch. One + * of the combines is used for {@link Serializable} elements while the second one is + * designed to handle non {@link Serializable} elements by wrapping them in an + * {@link ElementWrapper}. + * </ul> + * + * <h3>Using the {@link PTransform}s</h3> + * + * <h4>Default globally use</h4> + * + * <pre><code> + * {@literal PCollection<MyClass>} input = ...; + * {@literal PCollection<KV<MyClass, Long>>} output = input + * .apply(MostFrequent.globally()); + * </code></pre> + * + * <h4>Per key use with tuned parameters</h4> + * + * <pre><code> + * int k = 1000; + * int capacity = 1000000; + * {@literal PCollection<KV<Integer, MyClass>>} input = ...; + * {@literal PCollection<KV<Integer, KV<MyClass, Long>>>} output = input + * .apply(MostFrequent.perKey() + * .withCapacity(capacity) + * .topKElements(k)); + * </code></pre> + * + * <h3>Using the {@link CombineFn}s</h3> + * + * <h4>Case with {@link Serializable} elements</h4> + * + * <pre><code> + * int capacity = 1000000; + * {@literal PCollection<SerializableClass>} input = ...; + * {@literal PCollection<StreamSummarySketch<SerializableClass>>} output = input + * .apply(Combine.globally(MostFrequent.MostFrequentFn.SerializableElements + * .create(capacity))); + * </code></pre> + * + * <h4>Case with non {@link Serializable} elements</h4> + * + * <pre><code> + * int capacity = 1000000; + * {@literal PCollection<NonSerializableClass>} input = ...; + * {@literal PCollection<StreamSummarySketch<NonSerializableClass>>} output = input + * .apply(Combine.globally(MostFrequent.MostFrequentFn.NonSerializableElements + * .create(new NonSerializableClassCoder()) + * .withCapacity(capacity))); + * </code></pre> + * + * <h3>Query the sketch</h3> + * + * <p>There are two ways of getting top k elements from the sketch: + * + * <ul> + * <li>Using directly the {@link StreamSummarySketch#estimateTopK(int)} method that + * will return a {@link List} of {@link KV}s pairing elements to their frequency count. + * <li>Using {@link RetrieveTopK} utility class which contains {@link DoFn}s to process + * {@link PCollection} of {@link StreamSummarySketch} globally or per key, with serializable + * or non serializable elements. This {@link DoFn}s are used by default {@link PTransform}s. + * </ul> + * + * <h4>Query the resulting {@link PCollection} containing the sketch.</h4> + * + * <pre><code> + * int k = 100; + * int capacity = 100000 + * {@literal PCollection<StreamSummarySketch<MyClass>>} input = ...; + * {@literal PCollection<KV<MyClass, Long>>} output = input + * .apply(MostFrequent.MostFrequentFn.NonSerializableElements + * .create(new MyClassCoder()) + * .withCapacity(capacity)) + * .apply(ParDo.of(RetrieveTopK.globallyNonSerializable())); + * </code></pre> + * + * <b>Warning: this class is experimental.</b> <br> + * Its API is subject to change in future versions of Beam. + */ +@Experimental +public final class MostFrequent { + + /** + * Computes and returns the top K most frequent elements in the input {@link PCollection}. + * + * @param <InputT> the type of the elements in the input {@link PCollection} + */ + public static <InputT> GlobalSummary<InputT> globally() { + return GlobalSummary.<InputT>builder().build(); + } + + /** + * Computes and returns the top K most frequent elements in the input + * {@link PCollection} of {@link KV}s. + * + * @param <K> the type of the keys mapping the elements + * @param <V> the type of the elements being combined per key + */ + public static <K, V> PerKeySummary<K, V> perKey() { + return PerKeySummary.<K, V>builder().build(); + } + + /** Implementation of {@link #globally()}. */ + @AutoValue + public abstract static class GlobalSummary<InputT> + extends PTransform<PCollection<InputT>, PCollection<KV<InputT, Long>>> { + + abstract int capacity(); + abstract int topK(); + abstract Builder<InputT> toBuilder(); + + static <InputT> Builder<InputT> builder() { + return new AutoValue_MostFrequent_GlobalSummary.Builder<InputT>() + .setCapacity(10000) + .setTopK(100); + } + + @AutoValue.Builder + abstract static class Builder<InputT> { + abstract Builder<InputT> setCapacity(int c); + abstract Builder<InputT> setTopK(int k); + + abstract GlobalSummary<InputT> build(); + } + + /** + * Sets the number {@code k} of most frequent element to retrieve. + * + * @param k the top k most frequent elements to return + */ + public GlobalSummary<InputT> topKElements(int k) { + return toBuilder().setTopK(k).build(); + } + + /** + * Sets the capacity {@code c}. + * + * @param c the capacity of the summary + */ + public GlobalSummary<InputT> withCapacity(int c) { + return toBuilder().setCapacity(c).build(); + } + + @Override + public PCollection<KV<InputT, Long>> expand(PCollection<InputT> input) { + final Coder<InputT> inputCoder = input.getCoder(); + Class clazz = inputCoder.getEncodedTypeDescriptor().getRawType(); + PCollection<KV<InputT, Long>> result; + + if (Serializable.class.isAssignableFrom(clazz)) { + MostFrequentFn.SerializableElements<InputT> fn = MostFrequentFn.SerializableElements + .create(this.capacity()); + + result = input + .apply("Compute Stream Summary globally serializable", Combine.globally(fn)) + .apply("Retrieve k most frequent elements", ParDo + .of(RetrieveTopK.globallySerializable(this.topK()))) + .setCoder(KvCoder.of(input.getCoder(), VarLongCoder.of())); + } else { + MostFrequentFn.NonSerializableElements<InputT> fn = MostFrequentFn.NonSerializableElements + .create(inputCoder).withCapacity(this.capacity()); + + result = input + .apply("Compute Stream Summary globally non serializable", Combine.globally(fn)) + .apply("Retrieve k most frequent elements", ParDo + .of(RetrieveTopK.globallyNonSerializable(this.topK()))) + .setCoder(KvCoder.of(input.getCoder(), VarLongCoder.of())); + } + return result; + } + } + + /** Implementation of {@link #perKey()}. */ + @AutoValue + public abstract static class PerKeySummary<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, KV<V, Long>>>> { + + abstract int capacity(); + abstract int topK(); + abstract Builder<K, V> toBuilder(); + + static <K, V> Builder<K, V> builder() { + return new AutoValue_MostFrequent_PerKeySummary.Builder<K, V>() + .setCapacity(10000) + .setTopK(100); + } + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setCapacity(int c); + abstract Builder<K, V> setTopK(int k); + + abstract PerKeySummary<K, V> build(); + } + + /** + * Sets the number {@code k} of most frequent elements to retrieve. + * + * @param k the top k most frequent elements to return + */ + public PerKeySummary<K, V> topKElements(int k) { + return toBuilder().setTopK(k).build(); + } + + /** + * Sets the capacity {@code c}. + * + * @param c the capacity of the summary + */ + public PerKeySummary<K, V> withCapacity(int c) { + return toBuilder().setCapacity(c).build(); + } + + @Override + public PCollection<KV<K, KV<V, Long>>> expand(PCollection<KV<K, V>> input) { + final KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); + final Coder<V> valueCoder = inputCoder.getValueCoder(); + Class clazz = inputCoder.getValueCoder().getEncodedTypeDescriptor().getRawType(); + PCollection<KV<K, KV<V, Long>>> result; + + if (Serializable.class.isAssignableFrom(clazz)) { + MostFrequentFn.SerializableElements<V> fn = MostFrequentFn.SerializableElements + .create(this.capacity()); + + result = input + .apply("Compute Stream Summary per key serializable", Combine.perKey(fn)) + .apply("Retrieve k most frequent elements", ParDo + .of(RetrieveTopK.perKeySerializable(this.topK()))) + .setCoder(KvCoder.of( + inputCoder.getKeyCoder(), + KvCoder.of( + inputCoder.getValueCoder(), + VarLongCoder.of()))); + } else { + MostFrequentFn.NonSerializableElements<V> fn = MostFrequentFn.NonSerializableElements + .create(valueCoder).withCapacity(this.capacity()); + + result = input + .apply("Compute Stream Summary per key non serializable", Combine.perKey(fn)) + .apply("Retrieve k most frequent elements", ParDo + .of(RetrieveTopK.perKeyNonSerializable(this.topK()))) + .setCoder(KvCoder.of( + inputCoder.getKeyCoder(), + KvCoder.of( + inputCoder.getValueCoder(), + VarLongCoder.of()))); + } + return result; + } + } + + /** + * Implements the {@link CombineFn} of {@link MostFrequent} transforms. + * + * <p>This is an abstract class. In practice you should use either {@link SerializableElements} + * or {@link NonSerializableElements}. In the latter case elements will be wrapped into an + * {@link ElementWrapper} so they can be serialized via a {@link Coder}. + * + * @param <InputT> the type of the elements being combined + * @param <ElementT> the type of the elements actually taken by the accumulator + */ + public abstract static class MostFrequentFn<InputT, ElementT> + extends CombineFn<InputT, StreamSummarySketch<ElementT>, StreamSummarySketch<ElementT>> { + + protected int capacity; + + private MostFrequentFn(int capacity) { + this.capacity = capacity; + } + + @Override public StreamSummarySketch<ElementT> mergeAccumulators( + Iterable<StreamSummarySketch<ElementT>> accumulators) { + Iterator<StreamSummarySketch<ElementT>> it = accumulators.iterator(); + StreamSummarySketch<ElementT> mergedAccum = it.next(); + it.forEachRemaining((StreamSummarySketch<ElementT> accum) -> mergedAccum.merge(accum)); + return mergedAccum; + } + + @Override public StreamSummarySketch<ElementT> extractOutput(StreamSummarySketch<ElementT> ss) { + return ss; + } + + @Override public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData + .item("capacity", capacity) + .withLabel("Capacity of the Stream Summary sketch")); + } + + /** {@link MostFrequentFn} version to use when elements are serializable. */ + public static class SerializableElements<InputT> extends MostFrequentFn<InputT, InputT> { + + private SerializableElements(int capacity) { + super(capacity); + } + + /** + * Creates a {@link MostFrequentFn} for serializable elements + * with the given {@code capacity}. + * + * @param capacity the capacity of the summary + */ + public static <InputT> SerializableElements<InputT> create(int capacity) { + checkArgument(capacity > 0, + "Capacity must be greater than 0 ! Actual: " + capacity); + return new SerializableElements<>(capacity); + } + + @Override public StreamSummarySketch<InputT> createAccumulator() { + return new StreamSummarySketch<>(capacity); + } + + @Override public StreamSummarySketch<InputT> addInput( + StreamSummarySketch<InputT> accumulator, InputT input) { + accumulator.offer(input); + return accumulator; + } + } + + /** {@link MostFrequentFn} version to use when elements are not serializable. */ + public static class NonSerializableElements<InputT> + extends MostFrequentFn<InputT, ElementWrapper<InputT>> { + + protected Coder<InputT> coder; + + private NonSerializableElements(int capacity, Coder<InputT> coder) { + super(capacity); + this.coder = coder; + } + + /** + * Creates a {@link MostFrequentFn} for non serializable elements + * with the given {@link Coder}. + * + * @param coder the coder of the elements to combine + */ + public static <InputT> NonSerializableElements<InputT> create(Coder<InputT> coder) { + try { + coder.verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + throw new IllegalArgumentException("Coder must be deterministic to perform this sketch." + + e.getMessage(), e); + } + return new NonSerializableElements<>(10000, coder); + } + + /** + * Creates a {@link MostFrequentFn} for non serializable elements + * with the given {@code capacity}. + * + * @param capacity the capacity of the summary + */ + public NonSerializableElements<InputT> withCapacity(int capacity) { + checkArgument(capacity > 0, + "Capacity must be greater than 0 ! Actual: " + capacity); + return new NonSerializableElements<>(capacity, coder); + } + + @Override public StreamSummarySketch<ElementWrapper<InputT>> createAccumulator() { + return new StreamSummarySketch<>(capacity); + } + + @Override public StreamSummarySketch<ElementWrapper<InputT>> addInput( + StreamSummarySketch<ElementWrapper<InputT>> accumulator, InputT input) { + accumulator.offer(ElementWrapper.of(input, coder)); + return accumulator; + } + } + } + + /** + * Utility class to retrieve the top K elements from a {@link PCollection} + * or directly from a {@link StreamSummary} sketch. + * + * <p>This contains methods that retrieve the top k most frequent elements in + * a {@link StreamSummary} sketch. Elements are returned in {@link KV}s with the + * element value as key and its estimate frequency as value. + */ + public static class RetrieveTopK { + + /** + * {@link DoFn} that retrieves the top k most frequent elements in + * a {@link PCollection} of {@link StreamSummarySketch} containing + * non serializable objects wrapped in {@link ElementWrapper}. + * + * @param k the top k most frequent elements to return + */ + public static <T> DoFn<StreamSummarySketch<ElementWrapper<T>>, + KV<T, Long>> globallyNonSerializable(int k) { + return new DoFn<StreamSummarySketch<ElementWrapper<T>>, KV<T, Long>>() { + @ProcessElement + public void apply(ProcessContext c) { + for (KV<ElementWrapper<T>, Long> pair : c.element().estimateTopK(k)) { + c.output(KV.of(pair.getKey().getElement(), pair.getValue())); + } + } + }; + } + + /** + * {@link DoFn} that retrieves the top k most frequent elements in a + * {@link PCollection} of {@link StreamSummarySketch} containing serializable objects. + * + * @param k the top k most frequent elements to return + */ + public static <T> DoFn<StreamSummarySketch<T>, KV<T, Long>> globallySerializable(int k) { + return new DoFn<StreamSummarySketch<T>, KV<T, Long>>() { + @ProcessElement + public void apply(ProcessContext c) { + List<KV<T, Long>> top = c.element().estimateTopK(k); + for (KV<T, Long> pair : top) { + c.output(KV.of(pair.getKey(), pair.getValue())); + } + } + }; + } + + /** + * {@link DoFn} that retrieves the top k most frequent elements per key in a + * {@link PCollection} of {@link KV}s with {@link StreamSummarySketch} containing + * non serializable objects wrapped in {@link ElementWrapper}. + * + * @param k the top k most frequent elements to return + */ + public static <K, V> DoFn<KV<K, StreamSummarySketch<ElementWrapper<V>>>, + KV<K, KV<V, Long>>> perKeyNonSerializable(int k) { + return new DoFn<KV<K, StreamSummarySketch<ElementWrapper<V>>>, KV<K, KV<V, Long>>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV<K, StreamSummarySketch<ElementWrapper<V>>> kv = c.element(); + for (KV<ElementWrapper<V>, Long> pair : kv.getValue().estimateTopK(k)) { + c.output(KV.of( + kv.getKey(), + KV.of(pair.getKey().getElement(), pair.getValue()))); + } + } + }; + } + + /** + * {@link DoFn} that retrieves the top k most frequent elements per key in a + * {@link PCollection} of {@link KV}s with {@link StreamSummarySketch} containing + * serializable objects. + * + * @param k the top k most frequent elements to return + */ + public static <K, V> DoFn<KV<K, StreamSummarySketch<V>>, + KV<K, KV<V, Long>>> perKeySerializable(int k) { + return new DoFn<KV<K, StreamSummarySketch<V>>, KV<K, KV<V, Long>>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV<K, StreamSummarySketch<V>> kv = c.element(); + for (KV<V, Long> pair : kv.getValue().estimateTopK(k)) { + c.output(KV.of( + kv.getKey(), + KV.of(pair.getKey(), pair.getValue()))); + } + } + }; + } + } + + /** Wrap StreamLib's Stream Summary sketch. */ + public static class StreamSummarySketch<T> extends StreamSummary<T> { + + public StreamSummarySketch() { + super(); + } + + public StreamSummarySketch(int capacity) { + super(capacity); + } + + /** Merge several {@link StreamSummarySketch} into this one. */ + public void merge(StreamSummarySketch<T>... sketches) { + for (StreamSummarySketch<T> sketch : sketches) { + List<Counter<T>> top = sketch.topK(sketch.size()); + for (Counter<T> counter : top) { + offer(counter.getItem(), (int) counter.getCount()); + } + } + } + + /** + * Retrieves the {@code k} most frequent elements in a {@link StreamSummarySketch} + * sketch and returns a list of {@link KV}s where the pair at index i represents + * the ith most frequent element associated with its estimate count. + * + * @param k the top k most frequent elements to return + */ + public List<KV<T, Long>> estimateTopK(int k) { + List<Counter<T>> top = super.topK(k); + List<KV<T, Long>> output = new ArrayList<>(k); + for (Counter<T> c : top) { + output.add(KV.of(c.getItem(), c.getCount())); + } + return output; + } + + @Override + public int hashCode() { + return this.toString().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (!(o instanceof StreamSummarySketch)) { + return false; + } + StreamSummarySketch that = (StreamSummarySketch) o; + return this.toString().equals(that.toString()); + } + } + + /** + * This class is used to wrap a non serializable type into a serializable wrapper which + * uses the {@link Coder} as serializer. This wrapper is automatically used when calling + * {@link MostFrequentFn.NonSerializableElements} Combine. + */ + public static class ElementWrapper<T> implements Externalizable { + /** Returns a {@link ElementWrapper} with the given element to wrap and its coder. */ + public static <T> ElementWrapper<T> of(T element, Coder<T> coder) { + return new ElementWrapper<>(element, coder); + } + + private T element; + private Coder<T> elemCoder; + + public ElementWrapper() { + } + + private ElementWrapper(T element, Coder<T> coder) { + this.element = element; + this.elemCoder = coder; + } + + public T getElement() { + return element; + } + + public Coder<T> getCoder() { + return elemCoder; + } + + @Override + public int hashCode() { + return element.hashCode(); + } + + @Override + public String toString() { + return element.toString(); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } else if (!(o instanceof ElementWrapper)) { + return false; + } + ElementWrapper that = (ElementWrapper) o; + return this.element.equals(that.element); + } + + @Override public void writeExternal(ObjectOutput out) throws IOException { + byte[] elemBytes = CoderUtils.encodeToByteArray(elemCoder, element); + int length = elemBytes.length; + out.writeInt(length); + out.write(elemBytes); + out.writeObject(elemCoder); + } + + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) + throws IOException, ClassNotFoundException { + int length = in.readInt(); + byte[] elemBytes = new byte[length]; + for (int i = 0; i < length; i++) { + elemBytes[i] = in.readByte(); + } + elemCoder = (Coder<T>) in.readObject(); + this.element = CoderUtils.decodeFromByteArray(elemCoder, elemBytes); + } + } + + /** Coder for {@link StreamSummarySketch} class. */ + public static class StreamSummarySketchCoder<T> extends CustomCoder<StreamSummarySketch<T>> { + + @Override + public void encode(StreamSummarySketch<T> value, OutputStream outStream) throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(outStream); + value.writeExternal(oos); + } + + @Override + public StreamSummarySketch<T> decode(InputStream inStream) throws IOException { + ObjectInputStream ois = new ObjectInputStream(inStream); + try { + StreamSummarySketch<T> ss = new StreamSummarySketch<>(); + ss.readExternal(ois); + return ss; + } catch (ClassNotFoundException e) { + throw new CoderException("The stream cannot be decoded !"); + } + } + + @Override public boolean consistentWithEquals() { + return true; + } + } + + /** Coder for {@link ElementWrapper} class. */ + public static class ElementWrapperCoder<T> extends CustomCoder<ElementWrapper<T>> { + + @Override + public void encode(ElementWrapper<T> value, OutputStream outStream) throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(outStream); + value.writeExternal(oos); + } + + @Override + public ElementWrapper<T> decode(InputStream inStream) throws IOException { + ObjectInputStream ois = new ObjectInputStream(inStream); + ElementWrapper<T> decoded = new ElementWrapper<>(); + try { + decoded.readExternal(ois); + return decoded; + } catch (ClassNotFoundException e) { + throw new CoderException("The stream cannot be decoded !"); + } + } + } +} diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java new file mode 100755 index 00000000000..dae373a1ef9 --- /dev/null +++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.clearspring.analytics.stream.Counter; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.extensions.sketching.MostFrequent.ElementWrapper; +import org.apache.beam.sdk.extensions.sketching.MostFrequent.MostFrequentFn; +import org.apache.beam.sdk.extensions.sketching.MostFrequent.StreamSummarySketch; +import org.apache.beam.sdk.extensions.sketching.MostFrequent.StreamSummarySketchCoder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for {@link MostFrequent}. + */ +public class MostFrequentTest implements Serializable { + + @Rule + public final transient TestPipeline tp = TestPipeline.create(); + + private static Schema schema = SchemaBuilder.record("User").fields() + .requiredString("Pseudo") + .requiredInt("Age") + .endRecord(); + private static Coder<GenericRecord> coder = AvroCoder.of(schema); + private List<GenericRecord> avroStream = generateAvroStream(); + private List<Integer> smallStream = initList(); + + private List<Integer> initList() { + List<Integer> list = Arrays.asList( + 1, + 2, 2, + 3, 3, 3, + 4, 4, 4, 4, + 5, 5, 5, 5, 5, + 6, 6, 6, 6, 6, 6, + 7, 7, 7, 7, 7, 7, 7, + 8, 8, 8, 8, 8, 8, 8, 8, + 9, 9, 9, 9, 9, 9, 9, 9, 9, + 10, 10, 10, 10, 10, 10, 10, 10, 10, 10); + Collections.shuffle(list, new Random(1234)); + return list; + } + + private List<GenericRecord> generateAvroStream() { + List<GenericRecord> stream = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + GenericData.Record newRecord = new GenericData.Record(schema); + newRecord.put("Pseudo", "User" + i); + newRecord.put("Age", i); + stream.addAll(Collections.nCopies(i, newRecord)); + } + Collections.shuffle(stream, new Random(1234)); + return stream; + } + + @Test + public void globally() { + PCollection<Integer> col = tp.apply(Create.of(smallStream)) + .apply( + MostFrequent.<Integer>globally() + .withCapacity(10) + .topKElements(3)) + .apply(Keys.<Integer>create()); + + PAssert.that(col).containsInAnyOrder(10, 9, 8); + tp.run(); + } + + @Test + public void perKey() { + PCollection<Integer> col = tp.apply(Create.of(smallStream)) + .apply(WithKeys.of(1)) + .apply( + MostFrequent.<Integer, Integer>perKey() + .withCapacity(10) + .topKElements(3)) + .apply(Values.create()) + .apply(Keys.<Integer>create()); + + PAssert.that(col).containsInAnyOrder(10, 9, 8); + tp.run(); + } + + @Test + public void customObject() { + PCollection<GenericRecord> col = tp + .apply( + "Create stream", + Create.of(avroStream).withCoder(coder)) + .apply( + "Test custom object", + MostFrequent.<GenericRecord>globally() + .withCapacity(10) + .topKElements(3)) + .apply(Keys.<GenericRecord>create()); + + tp.run(); + } + + @Test + public void addWrapper() { + MostFrequentFn.NonSerializableElements<GenericRecord> fn = + MostFrequentFn.NonSerializableElements + .create(coder) + .withCapacity(10); + StreamSummarySketch<ElementWrapper<GenericRecord>> ss1 = fn.createAccumulator(); + for (GenericRecord record : avroStream) { + ss1 = fn.addInput(ss1, record); + } + + List<Counter<ElementWrapper<GenericRecord>>> elements = ss1.topK(10); + int ind = 0; + for (int i = 10; i > 0; i--) { + GenericRecord record = elements.get(ind).getItem().getElement(); + Long count = elements.get(ind).getCount(); + ind++; + + assertTrue("Not expected element ! Expected : " + i + " Actual : " + record + + " with count=" + count, (int) record.get("Age") == i); + assertTrue("Not expected count ! Expected : " + i + " Actual : " + + count + " of element " + record, count == i); + } + } + + @Test + public void mergeAccum() { + Coder<Integer> coder = VarIntCoder.of(); + MostFrequentFn.SerializableElements<Integer> fn = MostFrequentFn.SerializableElements + .create(10); + StreamSummarySketch<Integer> ss1 = fn.createAccumulator(); + StreamSummarySketch<Integer> ss2 = fn.createAccumulator(); + + for (Integer elem : smallStream) { + ss1.offer(elem); + ss2.offer(elem); + } + + StreamSummarySketch<Integer> ss3 = fn.mergeAccumulators(Arrays.asList(ss1, ss2)); + List<Counter<Integer>> elements = ss3.topK(10); + int ind = 0; + for (int i = 10; i > 0; i--) { + Integer element = elements.get(ind).getItem(); + Long count = elements.get(ind).getCount(); + ind++; + + assertTrue("Not expected element ! Expected : " + i + " Actual : " + element + + " with count=" + count, element == i); + assertTrue("Not expected count ! Expected : " + (2 * i) + " Actual : " + + count + " of element " + element, count == 2 * i); + } + } + + @Test + public void wrapperCoder() throws Exception { + ElementWrapper<GenericRecord> wrapper = ElementWrapper.of(avroStream.get(0), coder); + CoderProperties.coderDecodeEncodeEqual(new MostFrequent.ElementWrapperCoder<>(), wrapper); + } + + @Test + public void testSketchCoder() throws Exception { + StreamSummarySketch<Integer> ssSketch = new StreamSummarySketch<>(5); + for (int i = 0; i < 5; i++) { + ssSketch.offer(i); + } + CoderProperties.coderDecodeEncodeEqual(new StreamSummarySketchCoder<>(), ssSketch); + } + + @Test + public void testDisplayData() { + final MostFrequentFn fn = MostFrequentFn.SerializableElements + .create(100); + assertThat(DisplayData.from(fn), hasDisplayItem("capacity", 100)); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 117106) Time Spent: 12h 40m (was: 12.5h) > Extension for sketch-based statistics > ------------------------------------- > > Key: BEAM-2728 > URL: https://issues.apache.org/jira/browse/BEAM-2728 > Project: Beam > Issue Type: New Feature > Components: sdk-java-sketching > Reporter: Arnaud Fournier > Assignee: Arnaud Fournier > Priority: Minor > Time Spent: 12h 40m > Remaining Estimate: 0h > > Goal : Provide an extension library to compute approximate statistics on > streams. > Interest : Probabilistic data structures can create an approximation (sketch) > of the current state of a stream without storing every element but rather > processing each observation quickly to summarize its current state and find > useful statistical insights. > Implementation is here : > https://github.com/ArnaudFnr/beam/tree/sketching/sdks/java/extensions/sketching > More info : > https://docs.google.com/document/d/1Xy6g5RPBYX_HadpIr_2WrUeusiwL0Jo2ACI5PEOP1kc/edit -- This message was sent by Atlassian JIRA (v7.6.3#76005)