[
https://issues.apache.org/jira/browse/BEAM-2728?focusedWorklogId=117106&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117106
]
ASF GitHub Bot logged work on BEAM-2728:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Jun/18 22:53
Start Date: 28/Jun/18 22:53
Worklog Time Spent: 10m
Work Description: jkff closed pull request #4768: [BEAM-2728] Sketching
most frequent
URL: https://github.com/apache/beam/pull/4768
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/java/extensions/sketching/pom.xml
b/sdks/java/extensions/sketching/pom.xml
index d5ca7afd7b0..838038dcb85 100755
--- a/sdks/java/extensions/sketching/pom.xml
+++ b/sdks/java/extensions/sketching/pom.xml
@@ -30,7 +30,7 @@
<name>Apache Beam :: SDKs :: Java :: Extensions :: Sketching</name>
<properties>
- <streamlib.version>2.9.5</streamlib.version>
+ <streamlib.version>2.9.6</streamlib.version>
<t-digest.version>3.2</t-digest.version>
</properties>
diff --git
a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java
b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java
new file mode 100755
index 00000000000..62b62b613bc
--- /dev/null
+++
b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/MostFrequent.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sketching;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.clearspring.analytics.stream.Counter;
+import com.clearspring.analytics.stream.StreamSummary;
+import com.google.auto.value.AutoValue;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@code PTransform}s for computing the most frequent elements of a stream.
+ *
+ * <p>This class uses the Stream Summary sketch which uses a HashMap to keep
track of
+ * a certain number of elements considered as the most frequent elements in a
stream.
+ *
+ * <h2>Brief overview of the sketch</h2>
+ *
+ * <p>The Stream Summary uses a doubly linked-list of buckets in order to
order elements
+ * by frequency. Each bucket is associated with a list of Counters, each of
them storing
+ * information about one of the tracked elements : value, count and maximum
potential error.
+ * <br>
+ * An element is guaranteed to be in the top K most frequent if its
guaranteed number
+ * of hits, i.e. the {@code count minus error}, is greater than the count of
the element at
+ * the position k+1.
+ * <br>
+ * <b>WARNING</b>: Information about the error is lost during the
parallelization,
+ * so this value may be underestimated in most cases, that is why the error
is
+ * not output by default. This issue will be solved in the next version of
the extension.
+ *
+ * <h2>References</h2>
+ *
+ * <p>This class uses the Space-Saving algorithm, introduced in the paper
available <a href=
+ *
"https://pdfs.semanticscholar.org/72f1/5aba2e67b1cc9cd1fb12c99e101c3c1aae3b.pdf">here</a>.
+ * <br>The implementation of the Stream Summary sketch comes from
+ * <a href="https://github.com/addthis/stream-lib">Addthis' library
Stream-lib</a>:
+ *
+ * <h2>Parameters</h2>
+ *
+ * <p>Two parameters can be tuned:
+ *
+ * <ul>
+ * <li>The top {@code k} most frequent elements you want to output from the
stream.<br>
+ * By default, {@code k} is set to {@code 100}.
+ * <li>The {@code capacity} of the Stream Summary sketch. It corresponds to
+ * the maximum number of elements that can be tracked at once.
+ * When the maximum {@code capacity} is reached, the least frequent element
+ * will be replaced by the next untracked element.<br>
+ * By default, the {@code capacity} is set to {@code 10000} but this value
depends
+ * highly on the use case and more specifically on the order of incoming
elements.
+ * </ul>
+ *
+ * <p><b>WARNING</b>: Keep in mind that he {@code capacity} should always be
+ * significantly higher than {@code k}. One must ask himself the following
question:<br>
+ * What is the lowest rank {@code x} such that any element arriving at a
rank {@code y > x}
+ * has a relatively small probability (you decide) to actually become one
the {@code k}
+ * most frequent elements by the end of the stream ?<br>
+ * Then the {@code capacity} should at least be equal to {@code x}
+ *
+ * <h2>Examples:</h2>
+ *
+ * <p>There are 2 ways of using this class:
+ *
+ * <ul>
+ * <li>Use the {@link PTransform}s that return a {@link PCollection} of
{@link KV}s,
+ * each of them associating one of the estimate top {@code k} most frequent
elements
+ * to its frequency count.
+ * <li>Use one of the two {@link MostFrequentFn} combines that is exposed in
order
+ * to make advanced processing using directly the {@link StreamSummary}
sketch. One
+ * of the combines is used for {@link Serializable} elements while the
second one is
+ * designed to handle non {@link Serializable} elements by wrapping them in
an
+ * {@link ElementWrapper}.
+ * </ul>
+ *
+ * <h3>Using the {@link PTransform}s</h3>
+ *
+ * <h4>Default globally use</h4>
+ *
+ * <pre><code>
+ * {@literal PCollection<MyClass>} input = ...;
+ * {@literal PCollection<KV<MyClass, Long>>} output = input
+ * .apply(MostFrequent.globally());
+ * </code></pre>
+ *
+ * <h4>Per key use with tuned parameters</h4>
+ *
+ * <pre><code>
+ * int k = 1000;
+ * int capacity = 1000000;
+ * {@literal PCollection<KV<Integer, MyClass>>} input = ...;
+ * {@literal PCollection<KV<Integer, KV<MyClass, Long>>>} output = input
+ * .apply(MostFrequent.perKey()
+ * .withCapacity(capacity)
+ * .topKElements(k));
+ * </code></pre>
+ *
+ * <h3>Using the {@link CombineFn}s</h3>
+ *
+ * <h4>Case with {@link Serializable} elements</h4>
+ *
+ * <pre><code>
+ * int capacity = 1000000;
+ * {@literal PCollection<SerializableClass>} input = ...;
+ * {@literal PCollection<StreamSummarySketch<SerializableClass>>} output =
input
+ * .apply(Combine.globally(MostFrequent.MostFrequentFn.SerializableElements
+ * .create(capacity)));
+ * </code></pre>
+ *
+ * <h4>Case with non {@link Serializable} elements</h4>
+ *
+ * <pre><code>
+ * int capacity = 1000000;
+ * {@literal PCollection<NonSerializableClass>} input = ...;
+ * {@literal PCollection<StreamSummarySketch<NonSerializableClass>>} output =
input
+ *
.apply(Combine.globally(MostFrequent.MostFrequentFn.NonSerializableElements
+ * .create(new NonSerializableClassCoder())
+ * .withCapacity(capacity)));
+ * </code></pre>
+ *
+ * <h3>Query the sketch</h3>
+ *
+ * <p>There are two ways of getting top k elements from the sketch:
+ *
+ * <ul>
+ * <li>Using directly the {@link StreamSummarySketch#estimateTopK(int)}
method that
+ * will return a {@link List} of {@link KV}s pairing elements to their
frequency count.
+ * <li>Using {@link RetrieveTopK} utility class which contains {@link DoFn}s
to process
+ * {@link PCollection} of {@link StreamSummarySketch} globally or per key,
with serializable
+ * or non serializable elements. This {@link DoFn}s are used by default
{@link PTransform}s.
+ * </ul>
+ *
+ * <h4>Query the resulting {@link PCollection} containing the sketch.</h4>
+ *
+ * <pre><code>
+ * int k = 100;
+ * int capacity = 100000
+ * {@literal PCollection<StreamSummarySketch<MyClass>>} input = ...;
+ * {@literal PCollection<KV<MyClass, Long>>} output = input
+ * .apply(MostFrequent.MostFrequentFn.NonSerializableElements
+ * .create(new MyClassCoder())
+ * .withCapacity(capacity))
+ * .apply(ParDo.of(RetrieveTopK.globallyNonSerializable()));
+ * </code></pre>
+ *
+ * <b>Warning: this class is experimental.</b> <br>
+ * Its API is subject to change in future versions of Beam.
+ */
+@Experimental
+public final class MostFrequent {
+
+ /**
+ * Computes and returns the top K most frequent elements in the input {@link
PCollection}.
+ *
+ * @param <InputT> the type of the elements in the input {@link
PCollection}
+ */
+ public static <InputT> GlobalSummary<InputT> globally() {
+ return GlobalSummary.<InputT>builder().build();
+ }
+
+ /**
+ * Computes and returns the top K most frequent elements in the input
+ * {@link PCollection} of {@link KV}s.
+ *
+ * @param <K> the type of the keys mapping the elements
+ * @param <V> the type of the elements being combined per key
+ */
+ public static <K, V> PerKeySummary<K, V> perKey() {
+ return PerKeySummary.<K, V>builder().build();
+ }
+
+ /** Implementation of {@link #globally()}. */
+ @AutoValue
+ public abstract static class GlobalSummary<InputT>
+ extends PTransform<PCollection<InputT>, PCollection<KV<InputT,
Long>>> {
+
+ abstract int capacity();
+ abstract int topK();
+ abstract Builder<InputT> toBuilder();
+
+ static <InputT> Builder<InputT> builder() {
+ return new AutoValue_MostFrequent_GlobalSummary.Builder<InputT>()
+ .setCapacity(10000)
+ .setTopK(100);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<InputT> {
+ abstract Builder<InputT> setCapacity(int c);
+ abstract Builder<InputT> setTopK(int k);
+
+ abstract GlobalSummary<InputT> build();
+ }
+
+ /**
+ * Sets the number {@code k} of most frequent element to retrieve.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public GlobalSummary<InputT> topKElements(int k) {
+ return toBuilder().setTopK(k).build();
+ }
+
+ /**
+ * Sets the capacity {@code c}.
+ *
+ * @param c the capacity of the summary
+ */
+ public GlobalSummary<InputT> withCapacity(int c) {
+ return toBuilder().setCapacity(c).build();
+ }
+
+ @Override
+ public PCollection<KV<InputT, Long>> expand(PCollection<InputT> input) {
+ final Coder<InputT> inputCoder = input.getCoder();
+ Class clazz = inputCoder.getEncodedTypeDescriptor().getRawType();
+ PCollection<KV<InputT, Long>> result;
+
+ if (Serializable.class.isAssignableFrom(clazz)) {
+ MostFrequentFn.SerializableElements<InputT> fn =
MostFrequentFn.SerializableElements
+ .create(this.capacity());
+
+ result = input
+ .apply("Compute Stream Summary globally serializable",
Combine.globally(fn))
+ .apply("Retrieve k most frequent elements", ParDo
+ .of(RetrieveTopK.globallySerializable(this.topK())))
+ .setCoder(KvCoder.of(input.getCoder(), VarLongCoder.of()));
+ } else {
+ MostFrequentFn.NonSerializableElements<InputT> fn =
MostFrequentFn.NonSerializableElements
+ .create(inputCoder).withCapacity(this.capacity());
+
+ result = input
+ .apply("Compute Stream Summary globally non serializable",
Combine.globally(fn))
+ .apply("Retrieve k most frequent elements", ParDo
+ .of(RetrieveTopK.globallyNonSerializable(this.topK())))
+ .setCoder(KvCoder.of(input.getCoder(), VarLongCoder.of()));
+ }
+ return result;
+ }
+ }
+
+ /** Implementation of {@link #perKey()}. */
+ @AutoValue
+ public abstract static class PerKeySummary<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, KV<V,
Long>>>> {
+
+ abstract int capacity();
+ abstract int topK();
+ abstract Builder<K, V> toBuilder();
+
+ static <K, V> Builder<K, V> builder() {
+ return new AutoValue_MostFrequent_PerKeySummary.Builder<K, V>()
+ .setCapacity(10000)
+ .setTopK(100);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<K, V> {
+ abstract Builder<K, V> setCapacity(int c);
+ abstract Builder<K, V> setTopK(int k);
+
+ abstract PerKeySummary<K, V> build();
+ }
+
+ /**
+ * Sets the number {@code k} of most frequent elements to retrieve.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public PerKeySummary<K, V> topKElements(int k) {
+ return toBuilder().setTopK(k).build();
+ }
+
+ /**
+ * Sets the capacity {@code c}.
+ *
+ * @param c the capacity of the summary
+ */
+ public PerKeySummary<K, V> withCapacity(int c) {
+ return toBuilder().setCapacity(c).build();
+ }
+
+ @Override
+ public PCollection<KV<K, KV<V, Long>>> expand(PCollection<KV<K, V>> input)
{
+ final KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+ final Coder<V> valueCoder = inputCoder.getValueCoder();
+ Class clazz =
inputCoder.getValueCoder().getEncodedTypeDescriptor().getRawType();
+ PCollection<KV<K, KV<V, Long>>> result;
+
+ if (Serializable.class.isAssignableFrom(clazz)) {
+ MostFrequentFn.SerializableElements<V> fn =
MostFrequentFn.SerializableElements
+ .create(this.capacity());
+
+ result = input
+ .apply("Compute Stream Summary per key serializable",
Combine.perKey(fn))
+ .apply("Retrieve k most frequent elements", ParDo
+ .of(RetrieveTopK.perKeySerializable(this.topK())))
+ .setCoder(KvCoder.of(
+ inputCoder.getKeyCoder(),
+ KvCoder.of(
+ inputCoder.getValueCoder(),
+ VarLongCoder.of())));
+ } else {
+ MostFrequentFn.NonSerializableElements<V> fn =
MostFrequentFn.NonSerializableElements
+ .create(valueCoder).withCapacity(this.capacity());
+
+ result = input
+ .apply("Compute Stream Summary per key non serializable",
Combine.perKey(fn))
+ .apply("Retrieve k most frequent elements", ParDo
+ .of(RetrieveTopK.perKeyNonSerializable(this.topK())))
+ .setCoder(KvCoder.of(
+ inputCoder.getKeyCoder(),
+ KvCoder.of(
+ inputCoder.getValueCoder(),
+ VarLongCoder.of())));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Implements the {@link CombineFn} of {@link MostFrequent} transforms.
+ *
+ * <p>This is an abstract class. In practice you should use either {@link
SerializableElements}
+ * or {@link NonSerializableElements}. In the latter case elements will be
wrapped into an
+ * {@link ElementWrapper} so they can be serialized via a {@link Coder}.
+ *
+ * @param <InputT> the type of the elements being combined
+ * @param <ElementT> the type of the elements actually taken by the
accumulator
+ */
+ public abstract static class MostFrequentFn<InputT, ElementT>
+ extends CombineFn<InputT, StreamSummarySketch<ElementT>,
StreamSummarySketch<ElementT>> {
+
+ protected int capacity;
+
+ private MostFrequentFn(int capacity) {
+ this.capacity = capacity;
+ }
+
+ @Override public StreamSummarySketch<ElementT> mergeAccumulators(
+ Iterable<StreamSummarySketch<ElementT>> accumulators) {
+ Iterator<StreamSummarySketch<ElementT>> it = accumulators.iterator();
+ StreamSummarySketch<ElementT> mergedAccum = it.next();
+ it.forEachRemaining((StreamSummarySketch<ElementT> accum) ->
mergedAccum.merge(accum));
+ return mergedAccum;
+ }
+
+ @Override public StreamSummarySketch<ElementT>
extractOutput(StreamSummarySketch<ElementT> ss) {
+ return ss;
+ }
+
+ @Override public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData
+ .item("capacity", capacity)
+ .withLabel("Capacity of the Stream Summary sketch"));
+ }
+
+ /** {@link MostFrequentFn} version to use when elements are serializable.
*/
+ public static class SerializableElements<InputT> extends
MostFrequentFn<InputT, InputT> {
+
+ private SerializableElements(int capacity) {
+ super(capacity);
+ }
+
+ /**
+ * Creates a {@link MostFrequentFn} for serializable elements
+ * with the given {@code capacity}.
+ *
+ * @param capacity the capacity of the summary
+ */
+ public static <InputT> SerializableElements<InputT> create(int capacity)
{
+ checkArgument(capacity > 0,
+ "Capacity must be greater than 0 ! Actual: " + capacity);
+ return new SerializableElements<>(capacity);
+ }
+
+ @Override public StreamSummarySketch<InputT> createAccumulator() {
+ return new StreamSummarySketch<>(capacity);
+ }
+
+ @Override public StreamSummarySketch<InputT> addInput(
+ StreamSummarySketch<InputT> accumulator, InputT input) {
+ accumulator.offer(input);
+ return accumulator;
+ }
+ }
+
+ /** {@link MostFrequentFn} version to use when elements are not
serializable. */
+ public static class NonSerializableElements<InputT>
+ extends MostFrequentFn<InputT, ElementWrapper<InputT>> {
+
+ protected Coder<InputT> coder;
+
+ private NonSerializableElements(int capacity, Coder<InputT> coder) {
+ super(capacity);
+ this.coder = coder;
+ }
+
+ /**
+ * Creates a {@link MostFrequentFn} for non serializable elements
+ * with the given {@link Coder}.
+ *
+ * @param coder the coder of the elements to combine
+ */
+ public static <InputT> NonSerializableElements<InputT>
create(Coder<InputT> coder) {
+ try {
+ coder.verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ throw new IllegalArgumentException("Coder must be deterministic to
perform this sketch."
+ + e.getMessage(), e);
+ }
+ return new NonSerializableElements<>(10000, coder);
+ }
+
+ /**
+ * Creates a {@link MostFrequentFn} for non serializable elements
+ * with the given {@code capacity}.
+ *
+ * @param capacity the capacity of the summary
+ */
+ public NonSerializableElements<InputT> withCapacity(int capacity) {
+ checkArgument(capacity > 0,
+ "Capacity must be greater than 0 ! Actual: " + capacity);
+ return new NonSerializableElements<>(capacity, coder);
+ }
+
+ @Override public StreamSummarySketch<ElementWrapper<InputT>>
createAccumulator() {
+ return new StreamSummarySketch<>(capacity);
+ }
+
+ @Override public StreamSummarySketch<ElementWrapper<InputT>> addInput(
+ StreamSummarySketch<ElementWrapper<InputT>> accumulator, InputT
input) {
+ accumulator.offer(ElementWrapper.of(input, coder));
+ return accumulator;
+ }
+ }
+ }
+
+ /**
+ * Utility class to retrieve the top K elements from a {@link PCollection}
+ * or directly from a {@link StreamSummary} sketch.
+ *
+ * <p>This contains methods that retrieve the top k most frequent elements in
+ * a {@link StreamSummary} sketch. Elements are returned in {@link KV}s with
the
+ * element value as key and its estimate frequency as value.
+ */
+ public static class RetrieveTopK {
+
+ /**
+ * {@link DoFn} that retrieves the top k most frequent elements in
+ * a {@link PCollection} of {@link StreamSummarySketch} containing
+ * non serializable objects wrapped in {@link ElementWrapper}.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public static <T> DoFn<StreamSummarySketch<ElementWrapper<T>>,
+ KV<T, Long>> globallyNonSerializable(int k) {
+ return new DoFn<StreamSummarySketch<ElementWrapper<T>>, KV<T, Long>>() {
+ @ProcessElement
+ public void apply(ProcessContext c) {
+ for (KV<ElementWrapper<T>, Long> pair : c.element().estimateTopK(k))
{
+ c.output(KV.of(pair.getKey().getElement(), pair.getValue()));
+ }
+ }
+ };
+ }
+
+ /**
+ * {@link DoFn} that retrieves the top k most frequent elements in a
+ * {@link PCollection} of {@link StreamSummarySketch} containing
serializable objects.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public static <T> DoFn<StreamSummarySketch<T>, KV<T, Long>>
globallySerializable(int k) {
+ return new DoFn<StreamSummarySketch<T>, KV<T, Long>>() {
+ @ProcessElement
+ public void apply(ProcessContext c) {
+ List<KV<T, Long>> top = c.element().estimateTopK(k);
+ for (KV<T, Long> pair : top) {
+ c.output(KV.of(pair.getKey(), pair.getValue()));
+ }
+ }
+ };
+ }
+
+ /**
+ * {@link DoFn} that retrieves the top k most frequent elements per key in
a
+ * {@link PCollection} of {@link KV}s with {@link StreamSummarySketch}
containing
+ * non serializable objects wrapped in {@link ElementWrapper}.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public static <K, V> DoFn<KV<K, StreamSummarySketch<ElementWrapper<V>>>,
+ KV<K, KV<V, Long>>> perKeyNonSerializable(int k) {
+ return new DoFn<KV<K, StreamSummarySketch<ElementWrapper<V>>>, KV<K,
KV<V, Long>>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<K, StreamSummarySketch<ElementWrapper<V>>> kv = c.element();
+ for (KV<ElementWrapper<V>, Long> pair :
kv.getValue().estimateTopK(k)) {
+ c.output(KV.of(
+ kv.getKey(),
+ KV.of(pair.getKey().getElement(), pair.getValue())));
+ }
+ }
+ };
+ }
+
+ /**
+ * {@link DoFn} that retrieves the top k most frequent elements per key in
a
+ * {@link PCollection} of {@link KV}s with {@link StreamSummarySketch}
containing
+ * serializable objects.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public static <K, V> DoFn<KV<K, StreamSummarySketch<V>>,
+ KV<K, KV<V, Long>>> perKeySerializable(int k) {
+ return new DoFn<KV<K, StreamSummarySketch<V>>, KV<K, KV<V, Long>>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KV<K, StreamSummarySketch<V>> kv = c.element();
+ for (KV<V, Long> pair : kv.getValue().estimateTopK(k)) {
+ c.output(KV.of(
+ kv.getKey(),
+ KV.of(pair.getKey(), pair.getValue())));
+ }
+ }
+ };
+ }
+ }
+
+ /** Wrap StreamLib's Stream Summary sketch. */
+ public static class StreamSummarySketch<T> extends StreamSummary<T> {
+
+ public StreamSummarySketch() {
+ super();
+ }
+
+ public StreamSummarySketch(int capacity) {
+ super(capacity);
+ }
+
+ /** Merge several {@link StreamSummarySketch} into this one. */
+ public void merge(StreamSummarySketch<T>... sketches) {
+ for (StreamSummarySketch<T> sketch : sketches) {
+ List<Counter<T>> top = sketch.topK(sketch.size());
+ for (Counter<T> counter : top) {
+ offer(counter.getItem(), (int) counter.getCount());
+ }
+ }
+ }
+
+ /**
+ * Retrieves the {@code k} most frequent elements in a {@link
StreamSummarySketch}
+ * sketch and returns a list of {@link KV}s where the pair at index i
represents
+ * the ith most frequent element associated with its estimate count.
+ *
+ * @param k the top k most frequent elements to return
+ */
+ public List<KV<T, Long>> estimateTopK(int k) {
+ List<Counter<T>> top = super.topK(k);
+ List<KV<T, Long>> output = new ArrayList<>(k);
+ for (Counter<T> c : top) {
+ output.add(KV.of(c.getItem(), c.getCount()));
+ }
+ return output;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (!(o instanceof StreamSummarySketch)) {
+ return false;
+ }
+ StreamSummarySketch that = (StreamSummarySketch) o;
+ return this.toString().equals(that.toString());
+ }
+ }
+
+ /**
+ * This class is used to wrap a non serializable type into a serializable
wrapper which
+ * uses the {@link Coder} as serializer. This wrapper is automatically used
when calling
+ * {@link MostFrequentFn.NonSerializableElements} Combine.
+ */
+ public static class ElementWrapper<T> implements Externalizable {
+ /** Returns a {@link ElementWrapper} with the given element to wrap and
its coder. */
+ public static <T> ElementWrapper<T> of(T element, Coder<T> coder) {
+ return new ElementWrapper<>(element, coder);
+ }
+
+ private T element;
+ private Coder<T> elemCoder;
+
+ public ElementWrapper() {
+ }
+
+ private ElementWrapper(T element, Coder<T> coder) {
+ this.element = element;
+ this.elemCoder = coder;
+ }
+
+ public T getElement() {
+ return element;
+ }
+
+ public Coder<T> getCoder() {
+ return elemCoder;
+ }
+
+ @Override
+ public int hashCode() {
+ return element.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return element.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ } else if (!(o instanceof ElementWrapper)) {
+ return false;
+ }
+ ElementWrapper that = (ElementWrapper) o;
+ return this.element.equals(that.element);
+ }
+
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ byte[] elemBytes = CoderUtils.encodeToByteArray(elemCoder, element);
+ int length = elemBytes.length;
+ out.writeInt(length);
+ out.write(elemBytes);
+ out.writeObject(elemCoder);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] elemBytes = new byte[length];
+ for (int i = 0; i < length; i++) {
+ elemBytes[i] = in.readByte();
+ }
+ elemCoder = (Coder<T>) in.readObject();
+ this.element = CoderUtils.decodeFromByteArray(elemCoder, elemBytes);
+ }
+ }
+
+ /** Coder for {@link StreamSummarySketch} class. */
+ public static class StreamSummarySketchCoder<T> extends
CustomCoder<StreamSummarySketch<T>> {
+
+ @Override
+ public void encode(StreamSummarySketch<T> value, OutputStream outStream)
throws IOException {
+ ObjectOutputStream oos = new ObjectOutputStream(outStream);
+ value.writeExternal(oos);
+ }
+
+ @Override
+ public StreamSummarySketch<T> decode(InputStream inStream) throws
IOException {
+ ObjectInputStream ois = new ObjectInputStream(inStream);
+ try {
+ StreamSummarySketch<T> ss = new StreamSummarySketch<>();
+ ss.readExternal(ois);
+ return ss;
+ } catch (ClassNotFoundException e) {
+ throw new CoderException("The stream cannot be decoded !");
+ }
+ }
+
+ @Override public boolean consistentWithEquals() {
+ return true;
+ }
+ }
+
+ /** Coder for {@link ElementWrapper} class. */
+ public static class ElementWrapperCoder<T> extends
CustomCoder<ElementWrapper<T>> {
+
+ @Override
+ public void encode(ElementWrapper<T> value, OutputStream outStream) throws
IOException {
+ ObjectOutputStream oos = new ObjectOutputStream(outStream);
+ value.writeExternal(oos);
+ }
+
+ @Override
+ public ElementWrapper<T> decode(InputStream inStream) throws IOException {
+ ObjectInputStream ois = new ObjectInputStream(inStream);
+ ElementWrapper<T> decoded = new ElementWrapper<>();
+ try {
+ decoded.readExternal(ois);
+ return decoded;
+ } catch (ClassNotFoundException e) {
+ throw new CoderException("The stream cannot be decoded !");
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java
b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java
new file mode 100755
index 00000000000..dae373a1ef9
--- /dev/null
+++
b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/MostFrequentTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sketching;
+
+import static
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import com.clearspring.analytics.stream.Counter;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.extensions.sketching.MostFrequent.ElementWrapper;
+import org.apache.beam.sdk.extensions.sketching.MostFrequent.MostFrequentFn;
+import
org.apache.beam.sdk.extensions.sketching.MostFrequent.StreamSummarySketch;
+import
org.apache.beam.sdk.extensions.sketching.MostFrequent.StreamSummarySketchCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Keys;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests for {@link MostFrequent}.
+ */
+public class MostFrequentTest implements Serializable {
+
+ @Rule
+ public final transient TestPipeline tp = TestPipeline.create();
+
+ private static Schema schema = SchemaBuilder.record("User").fields()
+ .requiredString("Pseudo")
+ .requiredInt("Age")
+ .endRecord();
+ private static Coder<GenericRecord> coder = AvroCoder.of(schema);
+ private List<GenericRecord> avroStream = generateAvroStream();
+ private List<Integer> smallStream = initList();
+
+ private List<Integer> initList() {
+ List<Integer> list = Arrays.asList(
+ 1,
+ 2, 2,
+ 3, 3, 3,
+ 4, 4, 4, 4,
+ 5, 5, 5, 5, 5,
+ 6, 6, 6, 6, 6, 6,
+ 7, 7, 7, 7, 7, 7, 7,
+ 8, 8, 8, 8, 8, 8, 8, 8,
+ 9, 9, 9, 9, 9, 9, 9, 9, 9,
+ 10, 10, 10, 10, 10, 10, 10, 10, 10, 10);
+ Collections.shuffle(list, new Random(1234));
+ return list;
+ }
+
+ private List<GenericRecord> generateAvroStream() {
+ List<GenericRecord> stream = new ArrayList<>();
+ for (int i = 1; i <= 10; i++) {
+ GenericData.Record newRecord = new GenericData.Record(schema);
+ newRecord.put("Pseudo", "User" + i);
+ newRecord.put("Age", i);
+ stream.addAll(Collections.nCopies(i, newRecord));
+ }
+ Collections.shuffle(stream, new Random(1234));
+ return stream;
+ }
+
+ @Test
+ public void globally() {
+ PCollection<Integer> col = tp.apply(Create.of(smallStream))
+ .apply(
+ MostFrequent.<Integer>globally()
+ .withCapacity(10)
+ .topKElements(3))
+ .apply(Keys.<Integer>create());
+
+ PAssert.that(col).containsInAnyOrder(10, 9, 8);
+ tp.run();
+ }
+
+ @Test
+ public void perKey() {
+ PCollection<Integer> col = tp.apply(Create.of(smallStream))
+ .apply(WithKeys.of(1))
+ .apply(
+ MostFrequent.<Integer, Integer>perKey()
+ .withCapacity(10)
+ .topKElements(3))
+ .apply(Values.create())
+ .apply(Keys.<Integer>create());
+
+ PAssert.that(col).containsInAnyOrder(10, 9, 8);
+ tp.run();
+ }
+
+ @Test
+ public void customObject() {
+ PCollection<GenericRecord> col = tp
+ .apply(
+ "Create stream",
+ Create.of(avroStream).withCoder(coder))
+ .apply(
+ "Test custom object",
+ MostFrequent.<GenericRecord>globally()
+ .withCapacity(10)
+ .topKElements(3))
+ .apply(Keys.<GenericRecord>create());
+
+ tp.run();
+ }
+
+ @Test
+ public void addWrapper() {
+ MostFrequentFn.NonSerializableElements<GenericRecord> fn =
+ MostFrequentFn.NonSerializableElements
+ .create(coder)
+ .withCapacity(10);
+ StreamSummarySketch<ElementWrapper<GenericRecord>> ss1 =
fn.createAccumulator();
+ for (GenericRecord record : avroStream) {
+ ss1 = fn.addInput(ss1, record);
+ }
+
+ List<Counter<ElementWrapper<GenericRecord>>> elements = ss1.topK(10);
+ int ind = 0;
+ for (int i = 10; i > 0; i--) {
+ GenericRecord record = elements.get(ind).getItem().getElement();
+ Long count = elements.get(ind).getCount();
+ ind++;
+
+ assertTrue("Not expected element ! Expected : " + i + " Actual : " +
record
+ + " with count=" + count, (int) record.get("Age") == i);
+ assertTrue("Not expected count ! Expected : " + i + " Actual : "
+ + count + " of element " + record, count == i);
+ }
+ }
+
+ @Test
+ public void mergeAccum() {
+ Coder<Integer> coder = VarIntCoder.of();
+ MostFrequentFn.SerializableElements<Integer> fn =
MostFrequentFn.SerializableElements
+ .create(10);
+ StreamSummarySketch<Integer> ss1 = fn.createAccumulator();
+ StreamSummarySketch<Integer> ss2 = fn.createAccumulator();
+
+ for (Integer elem : smallStream) {
+ ss1.offer(elem);
+ ss2.offer(elem);
+ }
+
+ StreamSummarySketch<Integer> ss3 = fn.mergeAccumulators(Arrays.asList(ss1,
ss2));
+ List<Counter<Integer>> elements = ss3.topK(10);
+ int ind = 0;
+ for (int i = 10; i > 0; i--) {
+ Integer element = elements.get(ind).getItem();
+ Long count = elements.get(ind).getCount();
+ ind++;
+
+ assertTrue("Not expected element ! Expected : " + i + " Actual : " +
element
+ + " with count=" + count, element == i);
+ assertTrue("Not expected count ! Expected : " + (2 * i) + " Actual : "
+ + count + " of element " + element, count == 2 * i);
+ }
+ }
+
+ @Test
+ public void wrapperCoder() throws Exception {
+ ElementWrapper<GenericRecord> wrapper =
ElementWrapper.of(avroStream.get(0), coder);
+ CoderProperties.coderDecodeEncodeEqual(new
MostFrequent.ElementWrapperCoder<>(), wrapper);
+ }
+
+ @Test
+ public void testSketchCoder() throws Exception {
+ StreamSummarySketch<Integer> ssSketch = new StreamSummarySketch<>(5);
+ for (int i = 0; i < 5; i++) {
+ ssSketch.offer(i);
+ }
+ CoderProperties.coderDecodeEncodeEqual(new StreamSummarySketchCoder<>(),
ssSketch);
+ }
+
+ @Test
+ public void testDisplayData() {
+ final MostFrequentFn fn = MostFrequentFn.SerializableElements
+ .create(100);
+ assertThat(DisplayData.from(fn), hasDisplayItem("capacity", 100));
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117106)
Time Spent: 12h 40m (was: 12.5h)
> Extension for sketch-based statistics
> -------------------------------------
>
> Key: BEAM-2728
> URL: https://issues.apache.org/jira/browse/BEAM-2728
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-sketching
> Reporter: Arnaud Fournier
> Assignee: Arnaud Fournier
> Priority: Minor
> Time Spent: 12h 40m
> Remaining Estimate: 0h
>
> Goal : Provide an extension library to compute approximate statistics on
> streams.
> Interest : Probabilistic data structures can create an approximation (sketch)
> of the current state of a stream without storing every element but rather
> processing each observation quickly to summarize its current state and find
> useful statistical insights.
> Implementation is here :
> https://github.com/ArnaudFnr/beam/tree/sketching/sdks/java/extensions/sketching
> More info :
> https://docs.google.com/document/d/1Xy6g5RPBYX_HadpIr_2WrUeusiwL0Jo2ACI5PEOP1kc/edit
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)